issue #76: parent: broadcast DEL_ROUTE to interested parties

Now rather than simply propagate DEL_ROUTE upwards towards the parent,
we broadcast it downward to any stream that ever sent a message toward
any of the routes that have just become disconnected.
issue260
David Wilson 7 years ago
parent b9bafb78af
commit 431051f69b

@ -69,6 +69,7 @@ from mitogen.core import IOLOG
IS_WSL = 'Microsoft' in os.uname()[2]
itervalues = getattr(dict, 'itervalues', dict.values)
if mitogen.core.PY3:
xrange = range
@ -543,6 +544,7 @@ def _upgrade_broker(broker):
len(old.readers), len(old.writers))
@mitogen.core.takes_econtext
def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO
econtext.broker.defer(_upgrade_broker, econtext.broker)
@ -911,9 +913,6 @@ class Stream(mitogen.core.Stream):
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
#: List of contexts reachable via this stream; used to cleanup routes
#: during disconnection.
self.routes = set([self.remote_id])
def construct(self, max_message_size, remote_name=None, python_path=None,
debug=False, connect_timeout=None, profiling=False,
@ -1428,28 +1427,45 @@ class RouteMonitor(object):
persist=True,
policy=is_immediate_child,
)
#: Mapping of Stream instance to integer context IDs reachable via the
#: stream; used to cleanup routes during disconnection.
self._routes_by_stream = {}
def propagate(self, handle, target_id, name=None):
# self.parent is None in the master.
if not self.parent:
return
def _send_one(self, stream, handle, target_id, name):
data = str(target_id)
if name:
data = '%s:%s' % (target_id, mitogen.core.b(name))
self.parent.send(
stream.send(
mitogen.core.Message(
handle=handle,
data=data.encode('utf-8'),
dst_id=stream.remote_id,
)
)
def propagate(self, handle, target_id, name=None):
# self.parent is None in the master.
if self.parent:
self._send_one(self.parent, handle, target_id, name)
def child_propagate(self, handle, target_id):
"""
For DEL_ROUTE, we additionally want to broadcast the message to any
stream that has ever communicated with the disconnecting ID, so
core.py's :meth:`mitogen.core.Router._on_del_route` can turn the
message into a disconnect event.
"""
for stream in itervalues(self.router._stream_by_id):
if target_id in stream.egress_ids:
self._send_one(stream, mitogen.core.DEL_ROUTE, target_id, None)
def notice_stream(self, stream):
"""
When this parent is responsible for a new directly connected child
stream, we're also responsible for broadcasting DEL_ROUTE upstream
if/when that child disconnects.
"""
self._routes_by_stream[stream] = set([stream.remote_id])
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id,
stream.name)
mitogen.core.listen(
@ -1462,11 +1478,12 @@ class RouteMonitor(object):
"""
Respond to disconnection of a local stream by
"""
LOG.debug('%r is gone; propagating DEL_ROUTE for %r',
stream, stream.routes)
for target_id in stream.routes:
routes = self._routes_by_stream.pop(stream)
LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes)
for target_id in routes:
self.router.del_route(target_id)
self.propagate(mitogen.core.DEL_ROUTE, target_id)
self.child_propagate(mitogen.core.DEL_ROUTE, target_id)
context = self.router.context_by_id(target_id, create=False)
if context:
@ -1489,7 +1506,7 @@ class RouteMonitor(object):
return
LOG.debug('Adding route to %d via %r', target_id, stream)
stream.routes.add(target_id)
self._routes_by_stream[stream].add(target_id)
self.router.add_route(target_id, stream)
self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name)
@ -1505,14 +1522,21 @@ class RouteMonitor(object):
target_id, stream, registered_stream)
return
LOG.debug('Deleting route to %d via %r', target_id, stream)
stream.routes.discard(target_id)
self.router.del_route(target_id)
self.propagate(mitogen.core.DEL_ROUTE, target_id)
context = self.router.context_by_id(target_id, create=False)
if context:
LOG.debug('%r: Firing local disconnect for %r', self, context)
mitogen.core.fire(context, 'disconnect')
LOG.debug('Deleting route to %d via %r', target_id, stream)
routes = self._routes_by_stream.get(stream)
if routes:
routes.discard(target_id)
self.router.del_route(target_id)
if stream.remote_id != mitogen.parent_id:
self.propagate(mitogen.core.DEL_ROUTE, target_id)
self.child_propagate(mitogen.core.DEL_ROUTE, target_id)
class Router(mitogen.core.Router):
context_class = Context

@ -30,6 +30,11 @@ def wait_for_child(pid, timeout=1.0):
assert False, "wait_for_child() timed out"
@mitogen.core.takes_econtext
def call_func_in_sibling(ctx, econtext):
ctx.call(time.sleep, 99999)
class GetDefaultRemoteNameTest(testlib.TestCase):
func = staticmethod(mitogen.parent.get_default_remote_name)
@ -298,5 +303,57 @@ class WriteAllTest(unittest2.TestCase):
proc.terminate()
class DisconnectTest(testlib.RouterMixin, testlib.TestCase):
def test_child_disconnected(self):
# Easy mode: process notices its own directly connected child is
# disconnected.
c1 = self.router.fork()
recv = c1.call_async(time.sleep, 9999)
c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_indirect_child_disconnected(self):
# Achievement unlocked: process notices an indirectly connected child
# is disconnected.
c1 = self.router.fork()
c2 = self.router.fork(via=c1)
recv = c2.call_async(time.sleep, 9999)
c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_indirect_child_intermediary_disconnected(self):
# Battlefield promotion: process notices indirect child disconnected
# due to an intermediary child disconnecting.
c1 = self.router.fork()
c2 = self.router.fork(via=c1)
recv = c2.call_async(time.sleep, 9999)
c1.shutdown(wait=True)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get())
self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)
def test_sibling_disconnected(self):
# Hard mode: child notices sibling connected to same parent has
# disconnected.
c1 = self.router.fork()
c2 = self.router.fork()
# Let c1 call functions in c2.
self.router.stream_by_id(c1.context_id).auth_id = mitogen.context_id
c1.call(mitogen.parent.upgrade_router)
recv = c1.call_async(call_func_in_sibling, c2)
c2.shutdown(wait=True)
e = self.assertRaises(mitogen.core.CallError,
lambda: recv.get().unpickle())
self.assertTrue(e.args[0].startswith(
'mitogen.core.ChannelError: Channel closed by local end.'
))
if __name__ == '__main__':
unittest2.main()

Loading…
Cancel
Save