Move _DEAD into header, autogenerate dead messages

This change blocks off 2 common scenarios where a race condition is
upgraded to a hang, when the library could internally do better.

* Since we don't know whether the receiver of a `reply_to` is expecting
  a raw or pickled message, and since in the case of a raw reply, there
  is no way to signal "dead" to the receiver, override the reply_to
  field to explicitly mark a message as dead using a special handle.

  This replaces the serialized _DEAD sentinel value with a slightly
  neater interface, in the form of the reserved IS_DEAD handle, and
  enables an important subsequent change: when a context cannot route a
  message, it can send a generic 'dead' reply back towards the message
  source, ensuring any sleeping thread is woken with ChannelError.

  The use of this field could potentially be extended later on if
  additional flags are needed, but for now this seems to suffice.

* Teach Router._invoke() to reply with a dead message when it receives a
  message for an invalid local handle.

* Teach Router._async_route() to reply with a dead message when it
  receives an unroutable message.
pull/207/head
David Wilson 7 years ago
parent 8c3b1fcf15
commit 7c88e4d013

@ -338,6 +338,12 @@ Message Class
.. attribute:: data .. attribute:: data
.. attribute:: is_dead
:data:`True` if :attr:`reply_to` is set to the magic value
:data:`mitogen.core.IS_DEAD`, indicating the sender considers the
channel dead.
.. py:method:: __init__ (\**kwargs) .. py:method:: __init__ (\**kwargs)
Construct a message from from the supplied `kwargs`. :py:attr:`src_id` Construct a message from from the supplied `kwargs`. :py:attr:`src_id`
@ -362,15 +368,18 @@ Message Class
:raises mitogen.core.CallError: :raises mitogen.core.CallError:
The serialized data contained CallError exception. The serialized data contained CallError exception.
:raises mitogen.core.ChannelError: :raises mitogen.core.ChannelError:
The serialized data contained :py:data:`mitogen.core._DEAD`. The `is_dead` field was set.
.. method:: reply (obj, \**kwargs) .. method:: reply (obj, router=None, \**kwargs)
Compose a pickled reply to this message and send it using Compose a reply to this message and send it using :py:attr:`router`, or
:py:attr:`router`. `router` is :py:attr:`router` is :data:`None`.
:param obj: :param obj:
Object to serialize. Either a :class:`Message`, or an object to be serialized in order
to construct a new message.
:param router:
Optional router to use if :attr:`router` is :data:`None`.
:param kwargs: :param kwargs:
Optional keyword parameters overriding message fields in the reply. Optional keyword parameters overriding message fields in the reply.
@ -429,7 +438,7 @@ Router Class
:param mitogen.core.Context respondent: :param mitogen.core.Context respondent:
Context that messages to this handle are expected to be sent from. Context that messages to this handle are expected to be sent from.
If specified, arranges for :py:data:`_DEAD` to be delivered to `fn` If specified, arranges for a dead message to be delivered to `fn`
when disconnection of the context is detected. when disconnection of the context is detected.
In future `respondent` will likely also be used to prevent other In future `respondent` will likely also be used to prevent other
@ -943,7 +952,7 @@ Receiver Class
:param mitogen.core.Context respondent: :param mitogen.core.Context respondent:
Reference to the context this receiver is receiving from. If not Reference to the context this receiver is receiving from. If not
``None``, arranges for the receiver to receive :py:data:`_DEAD` if ``None``, arranges for the receiver to receive a dead message if
messages can no longer be routed to the context, due to disconnection messages can no longer be routed to the context, due to disconnection
or exit. or exit.
@ -1046,8 +1055,8 @@ Sender Class
.. py:method:: close () .. py:method:: close ()
Send :py:data:`_DEAD` to the remote end, causing Send a dead message to the remote end, causing :py:meth:`ChannelError`
:py:meth:`ChannelError` to be raised in any waiting thread. to be raised in any waiting thread.
.. py:method:: send (data) .. py:method:: send (data)

@ -310,7 +310,6 @@ User-defined types may not be used, except for:
* :py:class:`mitogen.core.CallError` * :py:class:`mitogen.core.CallError`
* :py:class:`mitogen.core.Context` * :py:class:`mitogen.core.Context`
* :py:class:`mitogen.core.Sender` * :py:class:`mitogen.core.Sender`
* :py:class:`mitogen.core._DEAD`
Subclasses of built-in types must be undecorated using Subclasses of built-in types must be undecorated using
:py:func:`mitogen.utils.cast`. :py:func:`mitogen.utils.cast`.

@ -294,6 +294,7 @@ parent and child:
- 4 - 4
- Integer target handle to direct any reply to this message. Used to - Integer target handle to direct any reply to this message. Used to
receive a one-time reply, such as the return value of a function call. receive a one-time reply, such as the return value of a function call.
:data:`IS_DEAD` has a special meaning when it appears in this field.
* - `length` * - `length`
- 4 - 4
@ -341,6 +342,22 @@ Masters listen on the following handles:
million parent contexts to be created and destroyed before the associated million parent contexts to be created and destroyed before the associated
Router must be recreated. Router must be recreated.
.. _IS_DEAD:
.. currentmodule:: mitogen.core
.. data:: IS_DEAD
Special value used to signal disconnection or the inability to route a
message, when it appears in the `reply_to` field. Usually causes
:class:`mitogen.core.ChannelError` to be raised when it is received.
It indicates the sender did not know how to process the message, or wishes
no further messages to be delivered to it. It is used when:
* a remote receiver is disconnected or explicitly closed.
* a related message could not be delivered due to no route existing for it.
* a router is being torn down, as a sentinel value to notify
:py:meth:`mitogen.core.Router.add_handler` callbacks to clean up.
Children listen on the following handles: Children listen on the following handles:
@ -373,7 +390,7 @@ Children listen on the following handles:
imports ``mod_name``, then attempts to execute imports ``mod_name``, then attempts to execute
`class_name.func_name(\*args, \**kwargs)`. `class_name.func_name(\*args, \**kwargs)`.
When this channel is closed (by way of sending :py:data:`_DEAD` to it), the When this channel is closed (by way of receiving a dead message), the
child's main thread begins graceful shutdown of its own :py:class:`Broker` child's main thread begins graceful shutdown of its own :py:class:`Broker`
and :py:class:`Router`. and :py:class:`Router`.
@ -382,7 +399,7 @@ Children listen on the following handles:
.. data:: SHUTDOWN .. data:: SHUTDOWN
When received from a child's immediate parent, causes the broker thread to When received from a child's immediate parent, causes the broker thread to
enter graceful shutdown, including writing :py:data:`_DEAD` to the child's enter graceful shutdown, including sending a dead message to the child's
main thread, causing it to join on the exit of the broker thread. main thread, causing it to join on the exit of the broker thread.
The final step of a child's broker shutdown process sends The final step of a child's broker shutdown process sends
@ -433,20 +450,6 @@ Additional handles are created to receive the result of every function call
triggered by :py:meth:`call_async() <mitogen.parent.Context.call_async>`. triggered by :py:meth:`call_async() <mitogen.parent.Context.call_async>`.
Sentinel Value
##############
.. _DEAD:
.. currentmodule:: mitogen.core
.. data:: _DEAD
This special value is used to signal disconnection or closure of the remote
end. It is used internally by :py:class:`Channel <mitogen.core.Channel>`
and also passed to any function still registered with
:py:meth:`add_handler() <mitogen.core.Router.add_handler>` during Broker
shutdown.
Use of Pickle Use of Pickle
############# #############
@ -458,7 +461,7 @@ in the bootstrap.
The pickler will instantiate only built-in types and one of 3 constructor The pickler will instantiate only built-in types and one of 3 constructor
functions, to support unpickling :py:class:`CallError functions, to support unpickling :py:class:`CallError
<mitogen.core.CallError>`, :py:data:`_DEAD <mitogen.core._DEAD>`, and <mitogen.core.CallError>`, :py:class:`mitogen.core.Sender`,and
:py:class:`Context <mitogen.core.Context>`. :py:class:`Context <mitogen.core.Context>`.
The choice of Pickle is one area to be revisited later. All accounts suggest it The choice of Pickle is one area to be revisited later. All accounts suggest it

@ -400,16 +400,21 @@ Helper Functions
disconnection was detected, otherwise ``False``. disconnection was detected, otherwise ``False``.
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.parent
.. autofunction:: create_child .. autofunction:: create_child
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.parent
.. autofunction:: tty_create_child .. autofunction:: tty_create_child
.. currentmodule:: mitogen.parent
.. autofunction:: hybrid_tty_create_child
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master
.. function:: get_child_modules (path) .. function:: get_child_modules (path)

@ -75,6 +75,7 @@ DEL_ROUTE = 104
ALLOCATE_ID = 105 ALLOCATE_ID = 105
SHUTDOWN = 106 SHUTDOWN = 106
LOAD_MODULE = 107 LOAD_MODULE = 107
IS_DEAD = 999
PY3 = sys.version_info > (3,) PY3 = sys.version_info > (3,)
if PY3: if PY3:
@ -146,30 +147,6 @@ class TimeoutError(Error):
pass pass
class Dead(object):
def __hash__(self):
return hash(Dead)
def __eq__(self, other):
return type(other) is Dead
def __ne__(self, other):
return type(other) is not Dead
def __reduce__(self):
return (_unpickle_dead, ())
def __repr__(self):
return '<Dead>'
def _unpickle_dead():
return _DEAD
_DEAD = Dead()
def has_parent_authority(msg, _stream=None): def has_parent_authority(msg, _stream=None):
return (msg.auth_id == mitogen.context_id or return (msg.auth_id == mitogen.context_id or
msg.auth_id in mitogen.parent_ids) msg.auth_id in mitogen.parent_ids)
@ -328,8 +305,6 @@ class Message(object):
if module == __name__: if module == __name__:
if func == '_unpickle_call_error': if func == '_unpickle_call_error':
return _unpickle_call_error return _unpickle_call_error
elif func == '_unpickle_dead':
return _unpickle_dead
elif func == '_unpickle_sender': elif func == '_unpickle_sender':
return self._unpickle_sender return self._unpickle_sender
elif func == '_unpickle_context': elif func == '_unpickle_context':
@ -337,6 +312,14 @@ class Message(object):
raise StreamError('cannot unpickle %r/%r', module, func) raise StreamError('cannot unpickle %r/%r', module, func)
@property
def is_dead(self):
return self.reply_to == IS_DEAD
@classmethod
def dead(cls, **kwargs):
return cls(reply_to=IS_DEAD, **kwargs)
@classmethod @classmethod
def pickled(cls, obj, **kwargs): def pickled(cls, obj, **kwargs):
self = cls(**kwargs) self = cls(**kwargs)
@ -347,15 +330,20 @@ class Message(object):
self.data = cPickle.dumps(CallError(e), protocol=2) self.data = cPickle.dumps(CallError(e), protocol=2)
return self return self
def reply(self, obj, **kwargs): def reply(self, msg, router=None, **kwargs):
kwargs.setdefault('handle', self.reply_to) if not isinstance(msg, Message):
self.router.route( msg = Message.pickled(msg)
self.pickled(obj, dst_id=self.src_id, **kwargs) msg.dst_id = self.src_id
) msg.handle = self.reply_to
vars(msg).update(kwargs)
(self.router or router).route(msg)
def unpickle(self, throw=True, throw_dead=True): def unpickle(self, throw=True, throw_dead=True):
"""Deserialize `data` into an object.""" """Deserialize `data` into an object."""
_vv and IOLOG.debug('%r.unpickle()', self) _vv and IOLOG.debug('%r.unpickle()', self)
if throw_dead and self.is_dead:
raise ChannelError(ChannelError.remote_msg)
obj = self._unpickled obj = self._unpickled
if obj is Message._unpickled: if obj is Message._unpickled:
fp = BytesIO(self.data) fp = BytesIO(self.data)
@ -374,8 +362,6 @@ class Message(object):
raise StreamError('invalid message: %s', e) raise StreamError('invalid message: %s', e)
if throw: if throw:
if obj == _DEAD and throw_dead:
raise ChannelError(ChannelError.remote_msg)
if isinstance(obj, CallError): if isinstance(obj, CallError):
raise obj raise obj
@ -402,22 +388,12 @@ class Sender(object):
def close(self): def close(self):
"""Indicate this channel is closed to the remote side.""" """Indicate this channel is closed to the remote side."""
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
self.context.send( self.context.send(Message.dead(handle=self.dst_handle))
Message.pickled(
_DEAD,
handle=self.dst_handle
)
)
def send(self, data): def send(self, data):
"""Send `data` to the remote.""" """Send `data` to the remote."""
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send( self.context.send(Message.pickled(data, handle=self.dst_handle))
Message.pickled(
data,
handle=self.dst_handle
)
)
def _unpickle_sender(router, context_id, dst_handle): def _unpickle_sender(router, context_id, dst_handle):
@ -460,7 +436,7 @@ class Receiver(object):
self.notify(self) self.notify(self)
def close(self): def close(self):
self._latch.put(_DEAD) self._latch.put(Message.dead())
def empty(self): def empty(self):
return self._latch.empty() return self._latch.empty()
@ -468,10 +444,11 @@ class Receiver(object):
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
msg = self._latch.get(timeout=timeout, block=block) msg = self._latch.get(timeout=timeout, block=block)
#IOLOG.debug('%r.get() got %r', self, msg) if msg.is_dead:
if msg.src_id == mitogen.context_id:
if msg == _DEAD: raise ChannelError(ChannelError.local_msg)
raise ChannelError(ChannelError.local_msg) else:
raise ChannelError(ChannelError.remote_msg)
return msg return msg
def __iter__(self): def __iter__(self):
@ -630,7 +607,7 @@ class Importer(object):
os.environ['PBR_VERSION'] = '0.0.0' os.environ['PBR_VERSION'] = '0.0.0'
def _on_load_module(self, msg): def _on_load_module(self, msg):
if msg == _DEAD: if msg.is_dead:
return return
tup = msg.unpickle() tup = msg.unpickle()
@ -1275,7 +1252,7 @@ class Router(object):
def _on_broker_exit(self): def _on_broker_exit(self):
while self._handle_map: while self._handle_map:
_, (_, func, _) = self._handle_map.popitem() _, (_, func, _) = self._handle_map.popitem()
func(_DEAD) func(Message.dead())
def register(self, context, stream): def register(self, context, stream):
_v and LOG.debug('register(%r, %r)', context, stream) _v and LOG.debug('register(%r, %r)', context, stream)
@ -1295,7 +1272,7 @@ class Router(object):
return msg.src_id == respondent.context_id return msg.src_id == respondent.context_id
def on_disconnect(): def on_disconnect():
if handle in self._handle_map: if handle in self._handle_map:
fn(_DEAD) fn(Message.dead())
del self._handle_map[handle] del self._handle_map[handle]
listen(respondent, 'disconnect', on_disconnect) listen(respondent, 'disconnect', on_disconnect)
@ -1309,7 +1286,7 @@ class Router(object):
fire(self, 'shutdown') fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems(): for handle, (persist, fn) in self._handle_map.iteritems():
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(_DEAD) fn(Message.dead())
refused_msg = 'Refused by policy.' refused_msg = 'Refused by policy.'
@ -1319,6 +1296,8 @@ class Router(object):
persist, fn, policy = self._handle_map[msg.handle] persist, fn, policy = self._handle_map[msg.handle]
except KeyError: except KeyError:
LOG.error('%r: invalid handle: %r', self, msg) LOG.error('%r: invalid handle: %r', self, msg)
if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead())
return return
if policy and not policy(msg, stream): if policy and not policy(msg, stream):
@ -1375,6 +1354,8 @@ class Router(object):
if stream is None: if stream is None:
LOG.error('%r: no route for %r, my ID is %r', LOG.error('%r: no route for %r, my ID is %r',
self, msg, mitogen.context_id) self, msg, mitogen.context_id)
if msg.reply_to and not msg.is_dead:
msg.reply(Message.dead(), router=self)
return return
stream._send(msg) stream._send(msg)
@ -1510,7 +1491,7 @@ class ExternalContext(object):
def _on_shutdown_msg(self, msg): def _on_shutdown_msg(self, msg):
_v and LOG.debug('_on_shutdown_msg(%r)', msg) _v and LOG.debug('_on_shutdown_msg(%r)', msg)
if msg != _DEAD: if not msg.is_dead:
self.broker.shutdown() self.broker.shutdown()
def _on_parent_disconnect(self): def _on_parent_disconnect(self):

@ -134,20 +134,17 @@ class Process(object):
self.control.put(('exit', status)) self.control.put(('exit', status))
def _on_stdin(self, msg): def _on_stdin(self, msg):
if msg == mitogen.core._DEAD: if msg.is_dead:
return
data = msg.unpickle(throw=False)
if data == mitogen.core._DEAD:
IOLOG.debug('%r._on_stdin() -> %r', self, data) IOLOG.debug('%r._on_stdin() -> %r', self, data)
self.pump.close() self.pump.close()
return return
data = msg.unpickle()
IOLOG.debug('%r._on_stdin() -> len %d', self, len(data)) IOLOG.debug('%r._on_stdin() -> len %d', self, len(data))
self.pump.write(data) self.pump.write(data)
def _on_control(self, msg): def _on_control(self, msg):
if msg != mitogen.core._DEAD: if not msg.is_dead:
command, arg = msg.unpickle(throw=False) command, arg = msg.unpickle(throw=False)
LOG.debug('%r._on_control(%r, %s)', self, command, arg) LOG.debug('%r._on_control(%r, %s)', self, command, arg)

@ -295,7 +295,7 @@ class LogForwarder(object):
) )
def _on_forward_log(self, msg): def _on_forward_log(self, msg):
if msg == mitogen.core._DEAD: if msg.is_dead:
return return
logger = self._cache.get(msg.src_id) logger = self._cache.get(msg.src_id)
@ -619,7 +619,7 @@ class ModuleResponder(object):
stream.sent_modules.add(fullname) stream.sent_modules.add(fullname)
def _on_get_module(self, msg): def _on_get_module(self, msg):
if msg == mitogen.core._DEAD: if msg.is_dead:
return return
LOG.debug('%r._on_get_module(%r)', self, msg.data) LOG.debug('%r._on_get_module(%r)', self, msg.data)
@ -742,7 +742,7 @@ class IdAllocator(object):
self.lock.release() self.lock.release()
def on_allocate_id(self, msg): def on_allocate_id(self, msg):
if msg == mitogen.core._DEAD: if msg.is_dead:
return return
id_, last_id = self.allocate_block() id_, last_id = self.allocate_block()

@ -850,7 +850,7 @@ class RouteMonitor(object):
mitogen.core.fire(context, 'disconnect') mitogen.core.fire(context, 'disconnect')
def _on_add_route(self, msg): def _on_add_route(self, msg):
if msg == mitogen.core._DEAD: if msg.is_dead:
return return
target_id_s, _, target_name = msg.data.partition(':') target_id_s, _, target_name = msg.data.partition(':')
@ -870,7 +870,7 @@ class RouteMonitor(object):
self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name) self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name)
def _on_del_route(self, msg): def _on_del_route(self, msg):
if msg == mitogen.core._DEAD: if msg.is_dead:
return return
target_id = int(msg.data) target_id = int(msg.data)
@ -1055,7 +1055,7 @@ class ModuleForwarder(object):
def _on_get_module(self, msg): def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg) LOG.debug('%r._on_get_module(%r)', self, msg)
if msg == mitogen.core._DEAD: if msg.is_dead:
return return
fullname = msg.data fullname = msg.data

@ -112,7 +112,6 @@ def cast(obj):
if isinstance(obj, str): if isinstance(obj, str):
return str(obj) return str(obj)
if isinstance(obj, (mitogen.core.Context, if isinstance(obj, (mitogen.core.Context,
mitogen.core.Dead,
mitogen.core.CallError)): mitogen.core.CallError)):
return obj return obj

@ -25,10 +25,6 @@ def func_with_bad_return_value():
return CrazyType() return CrazyType()
def func_returns_dead():
return mitogen.core._DEAD
def func_accepts_returns_context(context): def func_accepts_returns_context(context):
return context return context
@ -69,9 +65,6 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
"cannot unpickle '%s'/'CrazyType'" % (__name__,), "cannot unpickle '%s'/'CrazyType'" % (__name__,),
) )
def test_returns_dead(self):
self.assertEqual(mitogen.core._DEAD, self.local.call(func_returns_dead))
def test_aborted_on_local_context_disconnect(self): def test_aborted_on_local_context_disconnect(self):
stream = self.router._stream_by_id[self.local.context_id] stream = self.router._stream_by_id[self.local.context_id]
self.broker.stop_receive(stream) self.broker.stop_receive(stream)

@ -159,8 +159,8 @@ class CrashTest(testlib.BrokerMixin, unittest2.TestCase):
self.broker._loop_once = self._naughty self.broker._loop_once = self._naughty
self.broker.defer(lambda: None) self.broker.defer(lambda: None)
# sem should have received _DEAD. # sem should have received dead message.
self.assertEquals(mitogen.core._DEAD, sem.get()) self.assertTrue(sem.get().is_dead)
# Ensure it was logged. # Ensure it was logged.
expect = '_broker_main() crashed' expect = '_broker_main() crashed'
@ -176,7 +176,7 @@ class AddHandlerTest(unittest2.TestCase):
queue = Queue.Queue() queue = Queue.Queue()
handle = router.add_handler(queue.put) handle = router.add_handler(queue.put)
router.broker.shutdown() router.broker.shutdown()
self.assertEquals(queue.get(timeout=5), mitogen.core._DEAD) self.assertTrue(queue.get(timeout=5).is_dead)
class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase):
@ -218,5 +218,47 @@ class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase):
self.assertTrue(expect in logs.stop()) self.assertTrue(expect in logs.stop())
class NoRouteTest(testlib.RouterMixin, testlib.TestCase):
def test_invalid_handle_returns_dead(self):
# Verify sending a message to an invalid handle yields a dead message
# from the target context.
l1 = self.router.fork()
recv = l1.send_async(mitogen.core.Message(handle=999))
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()
)
self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg)
def test_totally_invalid_context_returns_dead(self):
recv = mitogen.core.Receiver(self.router)
self.router.route(
mitogen.core.Message(
dst_id=1234,
handle=1234,
reply_to=recv.handle,
)
)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()
)
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_previously_alive_context_returns_dead(self):
l1 = self.router.fork()
l1.shutdown(wait=True)
recv = mitogen.core.Receiver(self.router)
self.router.route(
mitogen.core.Message(
dst_id=l1.context_id,
handle=mitogen.core.CALL_FUNCTION,
reply_to=recv.handle,
)
)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get()
)
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

Loading…
Cancel
Save