issue #76: add API for ansible_mitogen to get route list

Earlier commit moved Stream.routes attribute into a private map
belonging to RouteMonitor, to make upgrades smoother. This adds a new
accessor method to RouteMonitor.
issue260
David Wilson 6 years ago
parent f3e19d81da
commit fba52a0edf

@ -256,8 +256,9 @@ class ContextService(mitogen.service.Service):
# in _latches_by_key below.
self._lock.acquire()
try:
routes = self.router.route_monitor.get_routes(stream)
for context, key in list(self._key_by_context.items()):
if context.context_id in stream.routes:
if context.context_id in routes:
LOG.info('Dropping %r due to disconnect of %r',
context, stream)
self._response_by_key.pop(key, None)

@ -1443,12 +1443,15 @@ class RouteMonitor(object):
)
)
def propagate(self, handle, target_id, name=None):
# self.parent is None in the master.
if self.parent:
self._send_one(self.parent, handle, target_id, name)
def _propagate(self, handle, target_id, name=None):
if not self.parent:
# self.parent is None in the master.
return
stream = self.router.stream_by_id(self.parent.context_id)
self._send_one(stream, handle, target_id, name)
def child_propagate(self, handle, target_id):
def _child_propagate(self, handle, target_id):
"""
For DEL_ROUTE, we additionally want to broadcast the message to any
stream that has ever communicated with the disconnecting ID, so
@ -1466,14 +1469,23 @@ class RouteMonitor(object):
if/when that child disconnects.
"""
self._routes_by_stream[stream] = set([stream.remote_id])
self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id,
stream.name)
self._propagate(mitogen.core.ADD_ROUTE, stream.remote_id,
stream.name)
mitogen.core.listen(
obj=stream,
name='disconnect',
func=lambda: self._on_stream_disconnect(stream),
)
def get_routes(self, stream):
"""
Return the set of context IDs reachable on a stream.
:param mitogen.core.Stream stream:
:returns: set([int])
"""
return self._routes_by_stream.get(stream) or set()
def _on_stream_disconnect(self, stream):
"""
Respond to disconnection of a local stream by
@ -1482,8 +1494,8 @@ class RouteMonitor(object):
LOG.debug('%r is gone; propagating DEL_ROUTE for %r', stream, routes)
for target_id in routes:
self.router.del_route(target_id)
self.propagate(mitogen.core.DEL_ROUTE, target_id)
self.child_propagate(mitogen.core.DEL_ROUTE, target_id)
self._propagate(mitogen.core.DEL_ROUTE, target_id)
self._child_propagate(mitogen.core.DEL_ROUTE, target_id)
context = self.router.context_by_id(target_id, create=False)
if context:
@ -1508,7 +1520,7 @@ class RouteMonitor(object):
LOG.debug('Adding route to %d via %r', target_id, stream)
self._routes_by_stream[stream].add(target_id)
self.router.add_route(target_id, stream)
self.propagate(mitogen.core.ADD_ROUTE, target_id, target_name)
self._propagate(mitogen.core.ADD_ROUTE, target_id, target_name)
def _on_del_route(self, msg):
if msg.is_dead:
@ -1534,8 +1546,8 @@ class RouteMonitor(object):
self.router.del_route(target_id)
if stream.remote_id != mitogen.parent_id:
self.propagate(mitogen.core.DEL_ROUTE, target_id)
self.child_propagate(mitogen.core.DEL_ROUTE, target_id)
self._propagate(mitogen.core.DEL_ROUTE, target_id)
self._child_propagate(mitogen.core.DEL_ROUTE, target_id)
class Router(mitogen.core.Router):

Loading…
Cancel
Save