diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 199f2116..eae8fc68 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -256,8 +256,9 @@ class ContextService(mitogen.service.Service): # in _latches_by_key below. self._lock.acquire() try: + routes = self.router.route_monitor.get_routes(stream) for context, key in list(self._key_by_context.items()): - if context.context_id in stream.routes: + if context.context_id in routes: LOG.info('Dropping %r due to disconnect of %r', context, stream) self._response_by_key.pop(key, None) diff --git a/mitogen/parent.py b/mitogen/parent.py index f3e39ebe..5e0157d6 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1443,12 +1443,15 @@ class RouteMonitor(object): ) ) - def propagate(self, handle, target_id, name=None): - # self.parent is None in the master. - if self.parent: - self._send_one(self.parent, handle, target_id, name) + def _propagate(self, handle, target_id, name=None): + if not self.parent: + # self.parent is None in the master. + return + + stream = self.router.stream_by_id(self.parent.context_id) + self._send_one(stream, handle, target_id, name) - def child_propagate(self, handle, target_id): + def _child_propagate(self, handle, target_id): """ For DEL_ROUTE, we additionally want to broadcast the message to any stream that has ever communicated with the disconnecting ID, so @@ -1466,14 +1469,23 @@ class RouteMonitor(object): if/when that child disconnects. """ self._routes_by_stream[stream] = set([stream.remote_id]) - self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id, - stream.name) + self._propagate(mitogen.core.ADD_ROUTE, stream.remote_id, + stream.name) mitogen.core.listen( obj=stream, name='disconnect', func=lambda: self._on_stream_disconnect(stream), ) + def get_routes(self, stream): + """ + Return the set of context IDs reachable on a stream. + + :param mitogen.core.Stream stream: + :returns: set([int]) + """ + return self._routes_by_stream.get(stream) or set() + def _on_stream_disconnect(self, stream): """ Respond to disconnection of a local stream by @@ -1482,8 +1494,8 @@ class RouteMonitor(object): LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes) for target_id in routes: self.router.del_route(target_id) - self.propagate(mitogen.core.DEL_ROUTE, target_id) - self.child_propagate(mitogen.core.DEL_ROUTE, target_id) + self._propagate(mitogen.core.DEL_ROUTE, target_id) + self._child_propagate(mitogen.core.DEL_ROUTE, target_id) context = self.router.context_by_id(target_id, create=False) if context: @@ -1508,7 +1520,7 @@ class RouteMonitor(object): LOG.debug('Adding route to %d via %r', target_id, stream) self._routes_by_stream[stream].add(target_id) self.router.add_route(target_id, stream) - self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name) + self._propagate(mitogen.core.ADD_ROUTE, target_id, target_name) def _on_del_route(self, msg): if msg.is_dead: @@ -1534,8 +1546,8 @@ class RouteMonitor(object): self.router.del_route(target_id) if stream.remote_id != mitogen.parent_id: - self.propagate(mitogen.core.DEL_ROUTE, target_id) - self.child_propagate(mitogen.core.DEL_ROUTE, target_id) + self._propagate(mitogen.core.DEL_ROUTE, target_id) + self._child_propagate(mitogen.core.DEL_ROUTE, target_id) class Router(mitogen.core.Router):