From 7c88e4d013e64515be5077ecc95d573b82f562bc Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 21 Apr 2018 21:57:11 +0100 Subject: [PATCH] Move _DEAD into header, autogenerate dead messages This change blocks off 2 common scenarios where a race condition is upgraded to a hang, when the library could internally do better. * Since we don't know whether the receiver of a `reply_to` is expecting a raw or pickled message, and since in the case of a raw reply, there is no way to signal "dead" to the receiver, override the reply_to field to explicitly mark a message as dead using a special handle. This replaces the serialized _DEAD sentinel value with a slightly neater interface, in the form of the reserved IS_DEAD handle, and enables an important subsequent change: when a context cannot route a message, it can send a generic 'dead' reply back towards the message source, ensuring any sleeping thread is woken with ChannelError. The use of this field could potentially be extended later on if additional flags are needed, but for now this seems to suffice. * Teach Router._invoke() to reply with a dead message when it receives a message for an invalid local handle. * Teach Router._async_route() to reply with a dead message when it receives an unroutable message. --- docs/api.rst | 27 +++++++---- docs/getting_started.rst | 1 - docs/howitworks.rst | 37 ++++++++------- docs/internals.rst | 9 +++- mitogen/core.py | 91 +++++++++++++++---------------------- mitogen/fakessh.py | 9 ++-- mitogen/master.py | 6 +-- mitogen/parent.py | 6 +-- mitogen/utils.py | 1 - tests/call_function_test.py | 7 --- tests/router_test.py | 48 +++++++++++++++++-- 11 files changed, 135 insertions(+), 107 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 2c4dc42f..f69f1405 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -338,6 +338,12 @@ Message Class .. attribute:: data + .. attribute:: is_dead + + :data:`True` if :attr:`reply_to` is set to the magic value + :data:`mitogen.core.IS_DEAD`, indicating the sender considers the + channel dead. + .. py:method:: __init__ (\**kwargs) Construct a message from from the supplied `kwargs`. :py:attr:`src_id` @@ -362,15 +368,18 @@ Message Class :raises mitogen.core.CallError: The serialized data contained CallError exception. :raises mitogen.core.ChannelError: - The serialized data contained :py:data:`mitogen.core._DEAD`. + The `is_dead` field was set. - .. method:: reply (obj, \**kwargs) + .. method:: reply (obj, router=None, \**kwargs) - Compose a pickled reply to this message and send it using - :py:attr:`router`. + Compose a reply to this message and send it using :py:attr:`router`, or + `router` is :py:attr:`router` is :data:`None`. :param obj: - Object to serialize. + Either a :class:`Message`, or an object to be serialized in order + to construct a new message. + :param router: + Optional router to use if :attr:`router` is :data:`None`. :param kwargs: Optional keyword parameters overriding message fields in the reply. @@ -429,7 +438,7 @@ Router Class :param mitogen.core.Context respondent: Context that messages to this handle are expected to be sent from. - If specified, arranges for :py:data:`_DEAD` to be delivered to `fn` + If specified, arranges for a dead message to be delivered to `fn` when disconnection of the context is detected. In future `respondent` will likely also be used to prevent other @@ -943,7 +952,7 @@ Receiver Class :param mitogen.core.Context respondent: Reference to the context this receiver is receiving from. If not - ``None``, arranges for the receiver to receive :py:data:`_DEAD` if + ``None``, arranges for the receiver to receive a dead message if messages can no longer be routed to the context, due to disconnection or exit. @@ -1046,8 +1055,8 @@ Sender Class .. py:method:: close () - Send :py:data:`_DEAD` to the remote end, causing - :py:meth:`ChannelError` to be raised in any waiting thread. + Send a dead message to the remote end, causing :py:meth:`ChannelError` + to be raised in any waiting thread. .. py:method:: send (data) diff --git a/docs/getting_started.rst b/docs/getting_started.rst index af529b00..21bc0c35 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -310,7 +310,6 @@ User-defined types may not be used, except for: * :py:class:`mitogen.core.CallError` * :py:class:`mitogen.core.Context` * :py:class:`mitogen.core.Sender` -* :py:class:`mitogen.core._DEAD` Subclasses of built-in types must be undecorated using :py:func:`mitogen.utils.cast`. diff --git a/docs/howitworks.rst b/docs/howitworks.rst index dd7c299f..0e48a538 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -294,6 +294,7 @@ parent and child: - 4 - Integer target handle to direct any reply to this message. Used to receive a one-time reply, such as the return value of a function call. + :data:`IS_DEAD` has a special meaning when it appears in this field. * - `length` - 4 @@ -341,6 +342,22 @@ Masters listen on the following handles: million parent contexts to be created and destroyed before the associated Router must be recreated. +.. _IS_DEAD: +.. currentmodule:: mitogen.core +.. data:: IS_DEAD + + Special value used to signal disconnection or the inability to route a + message, when it appears in the `reply_to` field. Usually causes + :class:`mitogen.core.ChannelError` to be raised when it is received. + + It indicates the sender did not know how to process the message, or wishes + no further messages to be delivered to it. It is used when: + + * a remote receiver is disconnected or explicitly closed. + * a related message could not be delivered due to no route existing for it. + * a router is being torn down, as a sentinel value to notify + :py:meth:`mitogen.core.Router.add_handler` callbacks to clean up. + Children listen on the following handles: @@ -373,7 +390,7 @@ Children listen on the following handles: imports ``mod_name``, then attempts to execute `class_name.func_name(\*args, \**kwargs)`. - When this channel is closed (by way of sending :py:data:`_DEAD` to it), the + When this channel is closed (by way of receiving a dead message), the child's main thread begins graceful shutdown of its own :py:class:`Broker` and :py:class:`Router`. @@ -382,7 +399,7 @@ Children listen on the following handles: .. data:: SHUTDOWN When received from a child's immediate parent, causes the broker thread to - enter graceful shutdown, including writing :py:data:`_DEAD` to the child's + enter graceful shutdown, including sending a dead message to the child's main thread, causing it to join on the exit of the broker thread. The final step of a child's broker shutdown process sends @@ -433,20 +450,6 @@ Additional handles are created to receive the result of every function call triggered by :py:meth:`call_async() `. -Sentinel Value -############## - -.. _DEAD: -.. currentmodule:: mitogen.core -.. data:: _DEAD - - This special value is used to signal disconnection or closure of the remote - end. It is used internally by :py:class:`Channel ` - and also passed to any function still registered with - :py:meth:`add_handler() ` during Broker - shutdown. - - Use of Pickle ############# @@ -458,7 +461,7 @@ in the bootstrap. The pickler will instantiate only built-in types and one of 3 constructor functions, to support unpickling :py:class:`CallError -`, :py:data:`_DEAD `, and +`, :py:class:`mitogen.core.Sender`,and :py:class:`Context `. The choice of Pickle is one area to be revisited later. All accounts suggest it diff --git a/docs/internals.rst b/docs/internals.rst index 842ac4e2..b6c3d069 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -400,16 +400,21 @@ Helper Functions disconnection was detected, otherwise ``False``. -.. currentmodule:: mitogen.master +.. currentmodule:: mitogen.parent .. autofunction:: create_child -.. currentmodule:: mitogen.master +.. currentmodule:: mitogen.parent .. autofunction:: tty_create_child +.. currentmodule:: mitogen.parent + +.. autofunction:: hybrid_tty_create_child + + .. currentmodule:: mitogen.master .. function:: get_child_modules (path) diff --git a/mitogen/core.py b/mitogen/core.py index 20f52554..dbac6f0d 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,6 +75,7 @@ DEL_ROUTE = 104 ALLOCATE_ID = 105 SHUTDOWN = 106 LOAD_MODULE = 107 +IS_DEAD = 999 PY3 = sys.version_info > (3,) if PY3: @@ -146,30 +147,6 @@ class TimeoutError(Error): pass -class Dead(object): - def __hash__(self): - return hash(Dead) - - def __eq__(self, other): - return type(other) is Dead - - def __ne__(self, other): - return type(other) is not Dead - - def __reduce__(self): - return (_unpickle_dead, ()) - - def __repr__(self): - return '' - - -def _unpickle_dead(): - return _DEAD - - -_DEAD = Dead() - - def has_parent_authority(msg, _stream=None): return (msg.auth_id == mitogen.context_id or msg.auth_id in mitogen.parent_ids) @@ -328,8 +305,6 @@ class Message(object): if module == __name__: if func == '_unpickle_call_error': return _unpickle_call_error - elif func == '_unpickle_dead': - return _unpickle_dead elif func == '_unpickle_sender': return self._unpickle_sender elif func == '_unpickle_context': @@ -337,6 +312,14 @@ class Message(object): raise StreamError('cannot unpickle %r/%r', module, func) + @property + def is_dead(self): + return self.reply_to == IS_DEAD + + @classmethod + def dead(cls, **kwargs): + return cls(reply_to=IS_DEAD, **kwargs) + @classmethod def pickled(cls, obj, **kwargs): self = cls(**kwargs) @@ -347,15 +330,20 @@ class Message(object): self.data = cPickle.dumps(CallError(e), protocol=2) return self - def reply(self, obj, **kwargs): - kwargs.setdefault('handle', self.reply_to) - self.router.route( - self.pickled(obj, dst_id=self.src_id, **kwargs) - ) + def reply(self, msg, router=None, **kwargs): + if not isinstance(msg, Message): + msg = Message.pickled(msg) + msg.dst_id = self.src_id + msg.handle = self.reply_to + vars(msg).update(kwargs) + (self.router or router).route(msg) def unpickle(self, throw=True, throw_dead=True): """Deserialize `data` into an object.""" _vv and IOLOG.debug('%r.unpickle()', self) + if throw_dead and self.is_dead: + raise ChannelError(ChannelError.remote_msg) + obj = self._unpickled if obj is Message._unpickled: fp = BytesIO(self.data) @@ -374,8 +362,6 @@ class Message(object): raise StreamError('invalid message: %s', e) if throw: - if obj == _DEAD and throw_dead: - raise ChannelError(ChannelError.remote_msg) if isinstance(obj, CallError): raise obj @@ -402,22 +388,12 @@ class Sender(object): def close(self): """Indicate this channel is closed to the remote side.""" _vv and IOLOG.debug('%r.close()', self) - self.context.send( - Message.pickled( - _DEAD, - handle=self.dst_handle - ) - ) + self.context.send(Message.dead(handle=self.dst_handle)) def send(self, data): """Send `data` to the remote.""" _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) - self.context.send( - Message.pickled( - data, - handle=self.dst_handle - ) - ) + self.context.send(Message.pickled(data, handle=self.dst_handle)) def _unpickle_sender(router, context_id, dst_handle): @@ -460,7 +436,7 @@ class Receiver(object): self.notify(self) def close(self): - self._latch.put(_DEAD) + self._latch.put(Message.dead()) def empty(self): return self._latch.empty() @@ -468,10 +444,11 @@ class Receiver(object): def get(self, timeout=None, block=True): _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) msg = self._latch.get(timeout=timeout, block=block) - #IOLOG.debug('%r.get() got %r', self, msg) - - if msg == _DEAD: - raise ChannelError(ChannelError.local_msg) + if msg.is_dead: + if msg.src_id == mitogen.context_id: + raise ChannelError(ChannelError.local_msg) + else: + raise ChannelError(ChannelError.remote_msg) return msg def __iter__(self): @@ -630,7 +607,7 @@ class Importer(object): os.environ['PBR_VERSION'] = '0.0.0' def _on_load_module(self, msg): - if msg == _DEAD: + if msg.is_dead: return tup = msg.unpickle() @@ -1275,7 +1252,7 @@ class Router(object): def _on_broker_exit(self): while self._handle_map: _, (_, func, _) = self._handle_map.popitem() - func(_DEAD) + func(Message.dead()) def register(self, context, stream): _v and LOG.debug('register(%r, %r)', context, stream) @@ -1295,7 +1272,7 @@ class Router(object): return msg.src_id == respondent.context_id def on_disconnect(): if handle in self._handle_map: - fn(_DEAD) + fn(Message.dead()) del self._handle_map[handle] listen(respondent, 'disconnect', on_disconnect) @@ -1309,7 +1286,7 @@ class Router(object): fire(self, 'shutdown') for handle, (persist, fn) in self._handle_map.iteritems(): _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) - fn(_DEAD) + fn(Message.dead()) refused_msg = 'Refused by policy.' @@ -1319,6 +1296,8 @@ class Router(object): persist, fn, policy = self._handle_map[msg.handle] except KeyError: LOG.error('%r: invalid handle: %r', self, msg) + if msg.reply_to and not msg.is_dead: + msg.reply(Message.dead()) return if policy and not policy(msg, stream): @@ -1375,6 +1354,8 @@ class Router(object): if stream is None: LOG.error('%r: no route for %r, my ID is %r', self, msg, mitogen.context_id) + if msg.reply_to and not msg.is_dead: + msg.reply(Message.dead(), router=self) return stream._send(msg) @@ -1510,7 +1491,7 @@ class ExternalContext(object): def _on_shutdown_msg(self, msg): _v and LOG.debug('_on_shutdown_msg(%r)', msg) - if msg != _DEAD: + if not msg.is_dead: self.broker.shutdown() def _on_parent_disconnect(self): diff --git a/mitogen/fakessh.py b/mitogen/fakessh.py index e07916ad..6a7303ab 100644 --- a/mitogen/fakessh.py +++ b/mitogen/fakessh.py @@ -134,20 +134,17 @@ class Process(object): self.control.put(('exit', status)) def _on_stdin(self, msg): - if msg == mitogen.core._DEAD: - return - - data = msg.unpickle(throw=False) - if data == mitogen.core._DEAD: + if msg.is_dead: IOLOG.debug('%r._on_stdin() -> %r', self, data) self.pump.close() return + data = msg.unpickle() IOLOG.debug('%r._on_stdin() -> len %d', self, len(data)) self.pump.write(data) def _on_control(self, msg): - if msg != mitogen.core._DEAD: + if not msg.is_dead: command, arg = msg.unpickle(throw=False) LOG.debug('%r._on_control(%r, %s)', self, command, arg) diff --git a/mitogen/master.py b/mitogen/master.py index 9025810a..1e46a916 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -295,7 +295,7 @@ class LogForwarder(object): ) def _on_forward_log(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return logger = self._cache.get(msg.src_id) @@ -619,7 +619,7 @@ class ModuleResponder(object): stream.sent_modules.add(fullname) def _on_get_module(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return LOG.debug('%r._on_get_module(%r)', self, msg.data) @@ -742,7 +742,7 @@ class IdAllocator(object): self.lock.release() def on_allocate_id(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return id_, last_id = self.allocate_block() diff --git a/mitogen/parent.py b/mitogen/parent.py index 3392f220..2868c93a 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -850,7 +850,7 @@ class RouteMonitor(object): mitogen.core.fire(context, 'disconnect') def _on_add_route(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return target_id_s, _, target_name = msg.data.partition(':') @@ -870,7 +870,7 @@ class RouteMonitor(object): self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name) def _on_del_route(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return target_id = int(msg.data) @@ -1055,7 +1055,7 @@ class ModuleForwarder(object): def _on_get_module(self, msg): LOG.debug('%r._on_get_module(%r)', self, msg) - if msg == mitogen.core._DEAD: + if msg.is_dead: return fullname = msg.data diff --git a/mitogen/utils.py b/mitogen/utils.py index 0b025b8a..ab8a673a 100644 --- a/mitogen/utils.py +++ b/mitogen/utils.py @@ -112,7 +112,6 @@ def cast(obj): if isinstance(obj, str): return str(obj) if isinstance(obj, (mitogen.core.Context, - mitogen.core.Dead, mitogen.core.CallError)): return obj diff --git a/tests/call_function_test.py b/tests/call_function_test.py index de3f1f46..199aaa77 100644 --- a/tests/call_function_test.py +++ b/tests/call_function_test.py @@ -25,10 +25,6 @@ def func_with_bad_return_value(): return CrazyType() -def func_returns_dead(): - return mitogen.core._DEAD - - def func_accepts_returns_context(context): return context @@ -69,9 +65,6 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): "cannot unpickle '%s'/'CrazyType'" % (__name__,), ) - def test_returns_dead(self): - self.assertEqual(mitogen.core._DEAD, self.local.call(func_returns_dead)) - def test_aborted_on_local_context_disconnect(self): stream = self.router._stream_by_id[self.local.context_id] self.broker.stop_receive(stream) diff --git a/tests/router_test.py b/tests/router_test.py index 2c0b7e60..09c54245 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -159,8 +159,8 @@ class CrashTest(testlib.BrokerMixin, unittest2.TestCase): self.broker._loop_once = self._naughty self.broker.defer(lambda: None) - # sem should have received _DEAD. - self.assertEquals(mitogen.core._DEAD, sem.get()) + # sem should have received dead message. + self.assertTrue(sem.get().is_dead) # Ensure it was logged. expect = '_broker_main() crashed' @@ -176,7 +176,7 @@ class AddHandlerTest(unittest2.TestCase): queue = Queue.Queue() handle = router.add_handler(queue.put) router.broker.shutdown() - self.assertEquals(queue.get(timeout=5), mitogen.core._DEAD) + self.assertTrue(queue.get(timeout=5).is_dead) class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): @@ -218,5 +218,47 @@ class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): self.assertTrue(expect in logs.stop()) +class NoRouteTest(testlib.RouterMixin, testlib.TestCase): + def test_invalid_handle_returns_dead(self): + # Verify sending a message to an invalid handle yields a dead message + # from the target context. + l1 = self.router.fork() + recv = l1.send_async(mitogen.core.Message(handle=999)) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get() + ) + self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg) + + def test_totally_invalid_context_returns_dead(self): + recv = mitogen.core.Receiver(self.router) + self.router.route( + mitogen.core.Message( + dst_id=1234, + handle=1234, + reply_to=recv.handle, + ) + ) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get() + ) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_previously_alive_context_returns_dead(self): + l1 = self.router.fork() + l1.shutdown(wait=True) + recv = mitogen.core.Receiver(self.router) + self.router.route( + mitogen.core.Message( + dst_id=l1.context_id, + handle=mitogen.core.CALL_FUNCTION, + reply_to=recv.handle, + ) + ) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get() + ) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + if __name__ == '__main__': unittest2.main()