diff --git a/mitogen/parent.py b/mitogen/parent.py index d8a1a633..f3e39ebe 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -69,6 +69,7 @@ from mitogen.core import IOLOG IS_WSL = 'Microsoft' in os.uname()[2] +itervalues = getattr(dict, 'itervalues', dict.values) if mitogen.core.PY3: xrange = range @@ -543,6 +544,7 @@ def _upgrade_broker(broker): len(old.readers), len(old.writers)) +@mitogen.core.takes_econtext def upgrade_router(econtext): if not isinstance(econtext.router, Router): # TODO econtext.broker.defer(_upgrade_broker, econtext.broker) @@ -911,9 +913,6 @@ class Stream(mitogen.core.Stream): def __init__(self, *args, **kwargs): super(Stream, self).__init__(*args, **kwargs) self.sent_modules = set(['mitogen', 'mitogen.core']) - #: List of contexts reachable via this stream; used to cleanup routes - #: during disconnection. - self.routes = set([self.remote_id]) def construct(self, max_message_size, remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, @@ -1428,28 +1427,45 @@ class RouteMonitor(object): persist=True, policy=is_immediate_child, ) + #: Mapping of Stream instance to integer context IDs reachable via the + #: stream; used to cleanup routes during disconnection. + self._routes_by_stream = {} - def propagate(self, handle, target_id, name=None): - # self.parent is None in the master. - if not self.parent: - return - + def _send_one(self, stream, handle, target_id, name): data = str(target_id) if name: data = '%s:%s' % (target_id, mitogen.core.b(name)) - self.parent.send( + stream.send( mitogen.core.Message( handle=handle, data=data.encode('utf-8'), + dst_id=stream.remote_id, ) ) + def propagate(self, handle, target_id, name=None): + # self.parent is None in the master. + if self.parent: + self._send_one(self.parent, handle, target_id, name) + + def child_propagate(self, handle, target_id): + """ + For DEL_ROUTE, we additionally want to broadcast the message to any + stream that has ever communicated with the disconnecting ID, so + core.py's :meth:`mitogen.core.Router._on_del_route` can turn the + message into a disconnect event. + """ + for stream in itervalues(self.router._stream_by_id): + if target_id in stream.egress_ids: + self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None) + def notice_stream(self, stream): """ When this parent is responsible for a new directly connected child stream, we're also responsible for broadcasting DEL_ROUTE upstream if/when that child disconnects. """ + self._routes_by_stream[stream] = set([stream.remote_id]) self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id, stream.name) mitogen.core.listen( @@ -1462,11 +1478,12 @@ class RouteMonitor(object): """ Respond to disconnection of a local stream by """ - LOG.debug('%r is gone; propagating DEL_ROUTE for %r', - stream, stream.routes) - for target_id in stream.routes: + routes = self._routes_by_stream.pop(stream) + LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes) + for target_id in routes: self.router.del_route(target_id) self.propagate(mitogen.core.DEL_ROUTE, target_id) + self.child_propagate(mitogen.core.DEL_ROUTE, target_id) context = self.router.context_by_id(target_id, create=False) if context: @@ -1489,7 +1506,7 @@ class RouteMonitor(object): return LOG.debug('Adding route to %d via %r', target_id, stream) - stream.routes.add(target_id) + self._routes_by_stream[stream].add(target_id) self.router.add_route(target_id, stream) self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name) @@ -1505,14 +1522,21 @@ class RouteMonitor(object): target_id, stream, registered_stream) return - LOG.debug('Deleting route to %d via %r', target_id, stream) - stream.routes.discard(target_id) - self.router.del_route(target_id) - self.propagate(mitogen.core.DEL_ROUTE, target_id) context = self.router.context_by_id(target_id, create=False) if context: + LOG.debug('%r: Firing local disconnect for %r', self, context) mitogen.core.fire(context, 'disconnect') + LOG.debug('Deleting route to %d via %r', target_id, stream) + routes = self._routes_by_stream.get(stream) + if routes: + routes.discard(target_id) + + self.router.del_route(target_id) + if stream.remote_id != mitogen.parent_id: + self.propagate(mitogen.core.DEL_ROUTE, target_id) + self.child_propagate(mitogen.core.DEL_ROUTE, target_id) + class Router(mitogen.core.Router): context_class = Context diff --git a/tests/parent_test.py b/tests/parent_test.py index c9ccaf3f..a450ee8a 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -30,6 +30,11 @@ def wait_for_child(pid, timeout=1.0): assert False, "wait_for_child() timed out" +@mitogen.core.takes_econtext +def call_func_in_sibling(ctx, econtext): + ctx.call(time.sleep, 99999) + + class GetDefaultRemoteNameTest(testlib.TestCase): func = staticmethod(mitogen.parent.get_default_remote_name) @@ -298,5 +303,57 @@ class WriteAllTest(unittest2.TestCase): proc.terminate() +class DisconnectTest(testlib.RouterMixin, testlib.TestCase): + def test_child_disconnected(self): + # Easy mode: process notices its own directly connected child is + # disconnected. + c1 = self.router.fork() + recv = c1.call_async(time.sleep, 9999) + c1.shutdown(wait=True) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get()) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_indirect_child_disconnected(self): + # Achievement unlocked: process notices an indirectly connected child + # is disconnected. + c1 = self.router.fork() + c2 = self.router.fork(via=c1) + recv = c2.call_async(time.sleep, 9999) + c2.shutdown(wait=True) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get()) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_indirect_child_intermediary_disconnected(self): + # Battlefield promotion: process notices indirect child disconnected + # due to an intermediary child disconnecting. + c1 = self.router.fork() + c2 = self.router.fork(via=c1) + recv = c2.call_async(time.sleep, 9999) + c1.shutdown(wait=True) + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get()) + self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) + + def test_sibling_disconnected(self): + # Hard mode: child notices sibling connected to same parent has + # disconnected. + c1 = self.router.fork() + c2 = self.router.fork() + + # Let c1 call functions in c2. + self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id + c1.call(mitogen.parent.upgrade_router) + + recv = c1.call_async(call_func_in_sibling, c2) + c2.shutdown(wait=True) + e = self.assertRaises(mitogen.core.CallError, + lambda: recv.get().unpickle()) + self.assertTrue(e.args[0].startswith( + 'mitogen.core.ChannelError: Channel closed by local end.' + )) + + if __name__ == '__main__': unittest2.main()