diff --git a/docs/api.rst b/docs/api.rst index 2c4dc42f..f69f1405 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -338,6 +338,12 @@ Message Class .. attribute:: data + .. attribute:: is_dead + + :data:`True` if :attr:`reply_to` is set to the magic value + :data:`mitogen.core.IS_DEAD`, indicating the sender considers the + channel dead. + .. py:method:: __init__ (\**kwargs) Construct a message from from the supplied `kwargs`. :py:attr:`src_id` @@ -362,15 +368,18 @@ Message Class :raises mitogen.core.CallError: The serialized data contained CallError exception. :raises mitogen.core.ChannelError: - The serialized data contained :py:data:`mitogen.core._DEAD`. + The `is_dead` field was set. - .. method:: reply (obj, \**kwargs) + .. method:: reply (obj, router=None, \**kwargs) - Compose a pickled reply to this message and send it using - :py:attr:`router`. + Compose a reply to this message and send it using :py:attr:`router`, or + `router` is :py:attr:`router` is :data:`None`. :param obj: - Object to serialize. + Either a :class:`Message`, or an object to be serialized in order + to construct a new message. + :param router: + Optional router to use if :attr:`router` is :data:`None`. :param kwargs: Optional keyword parameters overriding message fields in the reply. @@ -429,7 +438,7 @@ Router Class :param mitogen.core.Context respondent: Context that messages to this handle are expected to be sent from. - If specified, arranges for :py:data:`_DEAD` to be delivered to `fn` + If specified, arranges for a dead message to be delivered to `fn` when disconnection of the context is detected. In future `respondent` will likely also be used to prevent other @@ -943,7 +952,7 @@ Receiver Class :param mitogen.core.Context respondent: Reference to the context this receiver is receiving from. If not - ``None``, arranges for the receiver to receive :py:data:`_DEAD` if + ``None``, arranges for the receiver to receive a dead message if messages can no longer be routed to the context, due to disconnection or exit. @@ -1046,8 +1055,8 @@ Sender Class .. py:method:: close () - Send :py:data:`_DEAD` to the remote end, causing - :py:meth:`ChannelError` to be raised in any waiting thread. + Send a dead message to the remote end, causing :py:meth:`ChannelError` + to be raised in any waiting thread. .. py:method:: send (data) diff --git a/docs/getting_started.rst b/docs/getting_started.rst index af529b00..21bc0c35 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -310,7 +310,6 @@ User-defined types may not be used, except for: * :py:class:`mitogen.core.CallError` * :py:class:`mitogen.core.Context` * :py:class:`mitogen.core.Sender` -* :py:class:`mitogen.core._DEAD` Subclasses of built-in types must be undecorated using :py:func:`mitogen.utils.cast`. diff --git a/docs/howitworks.rst b/docs/howitworks.rst index dd7c299f..0e48a538 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -294,6 +294,7 @@ parent and child: - 4 - Integer target handle to direct any reply to this message. Used to receive a one-time reply, such as the return value of a function call. + :data:`IS_DEAD` has a special meaning when it appears in this field. * - `length` - 4 @@ -341,6 +342,22 @@ Masters listen on the following handles: million parent contexts to be created and destroyed before the associated Router must be recreated. +.. _IS_DEAD: +.. currentmodule:: mitogen.core +.. data:: IS_DEAD + + Special value used to signal disconnection or the inability to route a + message, when it appears in the `reply_to` field. Usually causes + :class:`mitogen.core.ChannelError` to be raised when it is received. + + It indicates the sender did not know how to process the message, or wishes + no further messages to be delivered to it. It is used when: + + * a remote receiver is disconnected or explicitly closed. + * a related message could not be delivered due to no route existing for it. + * a router is being torn down, as a sentinel value to notify + :py:meth:`mitogen.core.Router.add_handler` callbacks to clean up. + Children listen on the following handles: @@ -373,7 +390,7 @@ Children listen on the following handles: imports ``mod_name``, then attempts to execute `class_name.func_name(\*args, \**kwargs)`. - When this channel is closed (by way of sending :py:data:`_DEAD` to it), the + When this channel is closed (by way of receiving a dead message), the child's main thread begins graceful shutdown of its own :py:class:`Broker` and :py:class:`Router`. @@ -382,7 +399,7 @@ Children listen on the following handles: .. data:: SHUTDOWN When received from a child's immediate parent, causes the broker thread to - enter graceful shutdown, including writing :py:data:`_DEAD` to the child's + enter graceful shutdown, including sending a dead message to the child's main thread, causing it to join on the exit of the broker thread. The final step of a child's broker shutdown process sends @@ -433,20 +450,6 @@ Additional handles are created to receive the result of every function call triggered by :py:meth:`call_async() `. -Sentinel Value -############## - -.. _DEAD: -.. currentmodule:: mitogen.core -.. data:: _DEAD - - This special value is used to signal disconnection or closure of the remote - end. It is used internally by :py:class:`Channel ` - and also passed to any function still registered with - :py:meth:`add_handler() ` during Broker - shutdown. - - Use of Pickle ############# @@ -458,7 +461,7 @@ in the bootstrap. The pickler will instantiate only built-in types and one of 3 constructor functions, to support unpickling :py:class:`CallError -`, :py:data:`_DEAD `, and +`, :py:class:`mitogen.core.Sender`,and :py:class:`Context `. The choice of Pickle is one area to be revisited later. All accounts suggest it diff --git a/docs/internals.rst b/docs/internals.rst index 842ac4e2..b6c3d069 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -400,16 +400,21 @@ Helper Functions disconnection was detected, otherwise ``False``. -.. currentmodule:: mitogen.master +.. currentmodule:: mitogen.parent .. autofunction:: create_child -.. currentmodule:: mitogen.master +.. currentmodule:: mitogen.parent .. autofunction:: tty_create_child +.. currentmodule:: mitogen.parent + +.. autofunction:: hybrid_tty_create_child + + .. currentmodule:: mitogen.master .. function:: get_child_modules (path) diff --git a/mitogen/core.py b/mitogen/core.py index 20f52554..dbac6f0d 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,6 +75,7 @@ DEL_ROUTE = 104 ALLOCATE_ID = 105 SHUTDOWN = 106 LOAD_MODULE = 107 +IS_DEAD = 999 PY3 = sys.version_info > (3,) if PY3: @@ -146,30 +147,6 @@ class TimeoutError(Error): pass -class Dead(object): - def __hash__(self): - return hash(Dead) - - def __eq__(self, other): - return type(other) is Dead - - def __ne__(self, other): - return type(other) is not Dead - - def __reduce__(self): - return (_unpickle_dead, ()) - - def __repr__(self): - return '' - - -def _unpickle_dead(): - return _DEAD - - -_DEAD = Dead() - - def has_parent_authority(msg, _stream=None): return (msg.auth_id == mitogen.context_id or msg.auth_id in mitogen.parent_ids) @@ -328,8 +305,6 @@ class Message(object): if module == __name__: if func == '_unpickle_call_error': return _unpickle_call_error - elif func == '_unpickle_dead': - return _unpickle_dead elif func == '_unpickle_sender': return self._unpickle_sender elif func == '_unpickle_context': @@ -337,6 +312,14 @@ class Message(object): raise StreamError('cannot unpickle %r/%r', module, func) + @property + def is_dead(self): + return self.reply_to == IS_DEAD + + @classmethod + def dead(cls, **kwargs): + return cls(reply_to=IS_DEAD, **kwargs) + @classmethod def pickled(cls, obj, **kwargs): self = cls(**kwargs) @@ -347,15 +330,20 @@ class Message(object): self.data = cPickle.dumps(CallError(e), protocol=2) return self - def reply(self, obj, **kwargs): - kwargs.setdefault('handle', self.reply_to) - self.router.route( - self.pickled(obj, dst_id=self.src_id, **kwargs) - ) + def reply(self, msg, router=None, **kwargs): + if not isinstance(msg, Message): + msg = Message.pickled(msg) + msg.dst_id = self.src_id + msg.handle = self.reply_to + vars(msg).update(kwargs) + (self.router or router).route(msg) def unpickle(self, throw=True, throw_dead=True): """Deserialize `data` into an object.""" _vv and IOLOG.debug('%r.unpickle()', self) + if throw_dead and self.is_dead: + raise ChannelError(ChannelError.remote_msg) + obj = self._unpickled if obj is Message._unpickled: fp = BytesIO(self.data) @@ -374,8 +362,6 @@ class Message(object): raise StreamError('invalid message: %s', e) if throw: - if obj == _DEAD and throw_dead: - raise ChannelError(ChannelError.remote_msg) if isinstance(obj, CallError): raise obj @@ -402,22 +388,12 @@ class Sender(object): def close(self): """Indicate this channel is closed to the remote side.""" _vv and IOLOG.debug('%r.close()', self) - self.context.send( - Message.pickled( - _DEAD, - handle=self.dst_handle - ) - ) + self.context.send(Message.dead(handle=self.dst_handle)) def send(self, data): """Send `data` to the remote.""" _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) - self.context.send( - Message.pickled( - data, - handle=self.dst_handle - ) - ) + self.context.send(Message.pickled(data, handle=self.dst_handle)) def _unpickle_sender(router, context_id, dst_handle): @@ -460,7 +436,7 @@ class Receiver(object): self.notify(self) def close(self): - self._latch.put(_DEAD) + self._latch.put(Message.dead()) def empty(self): return self._latch.empty() @@ -468,10 +444,11 @@ class Receiver(object): def get(self, timeout=None, block=True): _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) msg = self._latch.get(timeout=timeout, block=block) - #IOLOG.debug('%r.get() got %r', self, msg) - - if msg == _DEAD: - raise ChannelError(ChannelError.local_msg) + if msg.is_dead: + if msg.src_id == mitogen.context_id: + raise ChannelError(ChannelError.local_msg) + else: + raise ChannelError(ChannelError.remote_msg) return msg def __iter__(self): @@ -630,7 +607,7 @@ class Importer(object): os.environ['PBR_VERSION'] = '0.0.0' def _on_load_module(self, msg): - if msg == _DEAD: + if msg.is_dead: return tup = msg.unpickle() @@ -1275,7 +1252,7 @@ class Router(object): def _on_broker_exit(self): while self._handle_map: _, (_, func, _) = self._handle_map.popitem() - func(_DEAD) + func(Message.dead()) def register(self, context, stream): _v and LOG.debug('register(%r, %r)', context, stream) @@ -1295,7 +1272,7 @@ class Router(object): return msg.src_id == respondent.context_id def on_disconnect(): if handle in self._handle_map: - fn(_DEAD) + fn(Message.dead()) del self._handle_map[handle] listen(respondent, 'disconnect', on_disconnect) @@ -1309,7 +1286,7 @@ class Router(object): fire(self, 'shutdown') for handle, (persist, fn) in self._handle_map.iteritems(): _v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) - fn(_DEAD) + fn(Message.dead()) refused_msg = 'Refused by policy.' @@ -1319,6 +1296,8 @@ class Router(object): persist, fn, policy = self._handle_map[msg.handle] except KeyError: LOG.error('%r: invalid handle: %r', self, msg) + if msg.reply_to and not msg.is_dead: + msg.reply(Message.dead()) return if policy and not policy(msg, stream): @@ -1375,6 +1354,8 @@ class Router(object): if stream is None: LOG.error('%r: no route for %r, my ID is %r', self, msg, mitogen.context_id) + if msg.reply_to and not msg.is_dead: + msg.reply(Message.dead(), router=self) return stream._send(msg) @@ -1510,7 +1491,7 @@ class ExternalContext(object): def _on_shutdown_msg(self, msg): _v and LOG.debug('_on_shutdown_msg(%r)', msg) - if msg != _DEAD: + if not msg.is_dead: self.broker.shutdown() def _on_parent_disconnect(self): diff --git a/mitogen/fakessh.py b/mitogen/fakessh.py index e07916ad..6a7303ab 100644 --- a/mitogen/fakessh.py +++ b/mitogen/fakessh.py @@ -134,20 +134,17 @@ class Process(object): self.control.put(('exit', status)) def _on_stdin(self, msg): - if msg == mitogen.core._DEAD: - return - - data = msg.unpickle(throw=False) - if data == mitogen.core._DEAD: + if msg.is_dead: IOLOG.debug('%r._on_stdin() -> %r', self, data) self.pump.close() return + data = msg.unpickle() IOLOG.debug('%r._on_stdin() -> len %d', self, len(data)) self.pump.write(data) def _on_control(self, msg): - if msg != mitogen.core._DEAD: + if not msg.is_dead: command, arg = msg.unpickle(throw=False) LOG.debug('%r._on_control(%r, %s)', self, command, arg) diff --git a/mitogen/master.py b/mitogen/master.py index 9025810a..1e46a916 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -295,7 +295,7 @@ class LogForwarder(object): ) def _on_forward_log(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return logger = self._cache.get(msg.src_id) @@ -619,7 +619,7 @@ class ModuleResponder(object): stream.sent_modules.add(fullname) def _on_get_module(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return LOG.debug('%r._on_get_module(%r)', self, msg.data) @@ -742,7 +742,7 @@ class IdAllocator(object): self.lock.release() def on_allocate_id(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return id_, last_id = self.allocate_block() diff --git a/mitogen/parent.py b/mitogen/parent.py index 3392f220..2868c93a 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -850,7 +850,7 @@ class RouteMonitor(object): mitogen.core.fire(context, 'disconnect') def _on_add_route(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return target_id_s, _, target_name = msg.data.partition(':') @@ -870,7 +870,7 @@ class RouteMonitor(object): self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name) def _on_del_route(self, msg): - if msg == mitogen.core._DEAD: + if msg.is_dead: return target_id = int(msg.data) @@ -1055,7 +1055,7 @@ class ModuleForwarder(object): def _on_get_module(self, msg): LOG.debug('%r._on_get_module(%r)', self, msg) - if msg == mitogen.core._DEAD: + if msg.is_dead: return fullname = msg.data diff --git a/mitogen/utils.py b/mitogen/utils.py index 0b025b8a..ab8a673a 100644 --- a/mitogen/utils.py +++ b/mitogen/utils.py @@ -112,7 +112,6 @@ def cast(obj): if isinstance(obj, str): return str(obj) if isinstance(obj, (mitogen.core.Context, - mitogen.core.Dead, mitogen.core.CallError)): return obj diff --git a/tests/call_function_test.py b/tests/call_function_test.py index de3f1f46..199aaa77 100644 --- a/tests/call_function_test.py +++ b/tests/call_function_test.py @@ -25,10 +25,6 @@ def func_with_bad_return_value(): return CrazyType() -def func_returns_dead(): - return mitogen.core._DEAD - - def func_accepts_returns_context(context): return context @@ -69,9 +65,6 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): "cannot unpickle '%s'/'CrazyType'" % (__name__,), ) - def test_returns_dead(self): - self.assertEqual(mitogen.core._DEAD, self.local.call(func_returns_dead)) - def test_aborted_on_local_context_disconnect(self): stream = self.router._stream_by_id[self.local.context_id] self.broker.stop_receive(stream) diff --git a/tests/router_test.py b/tests/router_test.py index 2c0b7e60..09c54245 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -159,8 +159,8 @@ class CrashTest(testlib.BrokerMixin, unittest2.TestCase): self.broker._loop_once = self._naughty self.broker.defer(lambda: None) - # sem should have received _DEAD. - self.assertEquals(mitogen.core._DEAD, sem.get()) + # sem should have received dead message. + self.assertTrue(sem.get().is_dead) # Ensure it was logged. expect = '_broker_main() crashed' @@ -176,7 +176,7 @@ class AddHandlerTest(unittest2.TestCase): queue = Queue.Queue() handle = router.add_handler(queue.put) router.broker.shutdown() - self.assertEquals(queue.get(timeout=5), mitogen.core._DEAD) + self.assertTrue(queue.get(timeout=5).is_dead) class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): @@ -218,5 +218,47 @@ class MessageSizeTest(testlib.BrokerMixin, unittest2.TestCase): self.assertTrue(expect in logs.stop()) +class NoRouteTest(testlib.RouterMixin, testlib.TestCase): + def test_invalid_handle_returns_dead(self): + # Verify sending a message to an invalid handle yields a dead message + # from the target context. + l1 = self.router.fork() + recv = l1.send_async(mitogen.core.Message(handle=999)) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get() + ) + self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg) + + def test_totally_invalid_context_returns_dead(self): + recv = mitogen.core.Receiver(self.router) + self.router.route( + mitogen.core.Message( + dst_id=1234, + handle=1234, + reply_to=recv.handle, + ) + ) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get() + ) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_previously_alive_context_returns_dead(self): + l1 = self.router.fork() + l1.shutdown(wait=True) + recv = mitogen.core.Receiver(self.router) + self.router.route( + mitogen.core.Message( + dst_id=l1.context_id, + handle=mitogen.core.CALL_FUNCTION, + reply_to=recv.handle, + ) + ) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get() + ) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + if __name__ == '__main__': unittest2.main()