From 54ff1c90fa75b685b99c75905f66cafa7a5c882a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 22 Mar 2018 11:34:02 +0545 Subject: [PATCH] issue #155: add DEL_ROUTE, propagate ADD_ROUTE upwards * IDs are allocated by the parent responsible for contructing a new child, using ALLOCATE_ID to the master as necessary to allocate new ID ranges. * ADD_ROUTE is sent up the tree rather than down. This permits construction of the new context to complete concurrent to parent contexts learning about its existence. Since all streams are strictly ordered, it's not possible for any parent to observe messages from the new context prior to arrival of an ADD_ROUTE from the parent notifying of its existence. If the new context, for example, implements an Ansible async task, its parent can start executing that without waiting for any synchronous confirmation from any parent or the master. * Since routes propagate up, it's no longer possible for a plain non-parent child to ever receive ADD_ROUTE, so that code can be moved out of core.py and into parent.py (-0.2kb compressed). * Add a .routes attribute to parent.Stream, and respond to disconnection signal on the stream by propagating DEL_ROUTE for any ADD_ROUTE ever received from that stream. * Centralize route management in a new parent.RouteMonitor class --- docs/howitworks.rst | 80 +++++++++++----------- mitogen/core.py | 28 ++------ mitogen/master.py | 36 ++++------ mitogen/parent.py | 159 +++++++++++++++++++++++++++++++++++++++----- 4 files changed, 196 insertions(+), 107 deletions(-) diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 30a6bd40..67947675 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -335,11 +335,11 @@ Masters listen on the following handles: .. currentmodule:: mitogen.core .. data:: ALLOCATE_ID - Replies to any message sent to it with a newly allocated unique context ID, - to allow children to safely start their own contexts. In future this is - likely to be replaced by 32-bit context IDs and pseudorandom allocation, - with an improved :py:data:`ADD_ROUTE` message sent upstream rather than - downstream that generates NACKs if any ancestor detects an ID collision. + Replies to any message sent to it with a newly allocated range of context + IDs, to allow children to safely start their own contexts. Presently IDs + are allocated in batches of 1000 from a 32 bit range, allowing up to 4.2 + million parent contexts to be created and destroyed before the associated + Router must be recreated. Children listen on the following handles: @@ -394,33 +394,28 @@ Children listen on the following handles: :py:data:`SHUTDOWN` to it, and arranging for the connection to its parent to be closed shortly thereafter. + +Masters, and children that have ever been used to create a descendent child +also listen on the following handles: + .. _ADD_ROUTE: .. currentmodule:: mitogen.core .. data:: ADD_ROUTE - Receives `(target_id, via_id)` integer tuples, describing how messages - arriving at this context on any stream should be forwarded on the stream - associated with the context `via_id` such that they are eventually - delivered to the target context. - - This message is necessary to inform intermediary contexts of the existence - of a downstream Context, as they do not otherwise parse traffic they are - fowarding to their downstream contexts that may cause new contexts to be - established. - - Given a chain `master -> ssh1 -> sudo1`, no :py:data:`ADD_ROUTE` message is - necessary, since :py:class:`mitogen.core.Router` in the `ssh` context can - arrange to update its routes while setting up the new child during - :py:meth:`Router.proxy_connect() `. - - However, given a chain like `master -> ssh1 -> sudo1 -> ssh2 -> sudo2`, - `ssh1` requires an :py:data:`ADD_ROUTE` for `ssh2`, and both `ssh1` and - `sudo1` require an :py:data:`ADD_ROUTE` for `sudo2`, as neither directly - dealt with its establishment. + Receives `target_id` integer from downstream, describing an ID allocated to + a recently constructed child. The receiver verifies no existing route + exists to `target_id` before updating its local table to route messages for + `target_id` via the stream from which the :py:data:`ADD_ROUTE` message was + received. +.. _DEL_ROUTE: +.. currentmodule:: mitogen.core +.. data:: DEL_ROUTE -Children that have ever been used to create a descendent child also listen on -the following handles: + Receives `target_id` integer from downstream, verifies a route exists to + `target_id` via the stream on which the message was received, removes that + route from its local table, then propagates the message upward towards its + own parent. .. currentmodule:: mitogen.core .. data:: GET_MODULE @@ -507,9 +502,13 @@ message or stream, instead it is forwarded upwards to the immediate parent, and recursively by each parent in turn until one is reached that knows how to forward the message down the tree. -When the master establishes a new context via an existing child context, it -sends corresponding :py:data:`ADD_ROUTE ` messages to -each indirect parent between the context and the root. +When a parent establishes a new child, it sends a corresponding +:py:data:`ADD_ROUTE ` message towards its parent, which +recursively forwards it up towards the root. + +Parents keep note of all routes associated with each stream they connect with, +and trigger ``DEL_ROUTE`` messages propagated upstream for each route +associated with that stream if the stream is disconnected for any reason. Example @@ -517,10 +516,16 @@ Example .. image:: images/context-tree.png -In the diagram, when ``master`` is creating the ``sudo:node12b:webapp`` -context, it must send ``ADD_ROUTE`` messages to ``rack12``, ``dc1``, -``bastion``, and itself; ``node12b`` does not require an ``ADD_ROUTE`` message -since it has a stream directly connected to the new context. +In the diagram, when ``node12b`` is creating the ``sudo:node12b:webapp`` +context, it must send ``ADD_ROUTE`` messages to ``rack12``, which will +propagate it to ``dc1``, and recursively to ``bastion``, and ``master``; +``node12b`` does not require an ``ADD_ROUTE`` message since it has a stream +directly connected to the new context. + +Since Mitogen streams are strictly ordered, it is never possible for a parent +to receive a message from a newly constructed child before receiving a +corresponding ``ADD_ROUTE`` sent by the child's parent, describing how to reply +to it. When ``sudo:node22a:webapp`` wants to send a message to ``sudo:node12b:webapp``, the message will be routed as follows: @@ -555,15 +560,6 @@ where isolated processes can connect to a listener and communicate with an already established established tree. -Future -###### - -The current routing approach is incomplete, since routes to downstream contexts -are not propagated upwards when a descendant of the master context establishes -a new child context, but that is okay for now, since child contexts cannot -currently allocate new context IDs anyway. - - Differences Between Master And Child Brokers ############################################ diff --git a/mitogen/core.py b/mitogen/core.py index adabed0a..ee7c5877 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -63,9 +63,10 @@ GET_MODULE = 100 CALL_FUNCTION = 101 FORWARD_LOG = 102 ADD_ROUTE = 103 -ALLOCATE_ID = 104 -SHUTDOWN = 105 -LOAD_MODULE = 106 +DEL_ROUTE = 104 +ALLOCATE_ID = 105 +SHUTDOWN = 106 +LOAD_MODULE = 107 CHUNK_SIZE = 131072 _tls = threading.local() @@ -1115,17 +1116,11 @@ class Router(object): self._context_by_id = {} self._last_handle = itertools.count(1000) #: handle -> (persistent?, func(msg)) - self._handle_map = { - ADD_ROUTE: (True, self._on_add_route) - } + self._handle_map = {} def __repr__(self): return 'Router(%r)' % (self.broker,) - def stream_by_id(self, dst_id): - return self._stream_by_id.get(dst_id, - self._stream_by_id.get(mitogen.parent_id)) - def on_disconnect(self, stream, broker): """Invoked by Stream.on_disconnect().""" for context in self._context_by_id.itervalues(): @@ -1141,19 +1136,6 @@ class Router(object): for _, func in self._handle_map.itervalues(): func(_DEAD) - def add_route(self, target_id, via_id): - _v and LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id) - try: - self._stream_by_id[target_id] = self._stream_by_id[via_id] - except KeyError: - LOG.error('%r: cant add route to %r via %r: no such stream', - self, target_id, via_id) - - def _on_add_route(self, msg): - if msg != _DEAD: - target_id, via_id = map(int, msg.data.split('\x00')) - self.add_route(target_id, via_id) - def register(self, context, stream): _v and LOG.debug('register(%r, %r)', context, stream) self._stream_by_id[context.context_id] = stream diff --git a/mitogen/master.py b/mitogen/master.py index b82a40c2..580ab61f 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -683,9 +683,13 @@ class Router(mitogen.parent.Router): if broker is None: broker = self.broker_class() super(Router, self).__init__(broker) + self.upgrade() + + def upgrade(self): self.id_allocator = IdAllocator(self) self.responder = ModuleResponder(self) self.log_forwarder = LogForwarder(self) + self.route_monitor = mitogen.parent.RouteMonitor(router=self) def enable_debug(self): mitogen.core.enable_debug_logging() @@ -710,23 +714,6 @@ class Router(mitogen.parent.Router): def ssh(self, **kwargs): return self.connect('ssh', **kwargs) - def propagate_route(self, target, via): - self.add_route(target.context_id, via.context_id) - child = via - parent = via.via - - while parent is not None: - LOG.debug('Adding route to %r for %r via %r', - parent, target, child) - parent.send( - mitogen.core.Message( - data='%s\x00%s' % (target.context_id, child.context_id), - handle=mitogen.core.ADD_ROUTE, - ) - ) - child = parent - parent = parent.via - def disconnect_stream(self, stream): self.broker.defer(stream.on_disconnect, self.broker) @@ -745,6 +732,8 @@ class IdAllocator(object): def __repr__(self): return 'IdAllocator(%r)' % (self.router,) + BLOCK_SIZE = 1000 + def allocate(self): self.lock.acquire() try: @@ -758,8 +747,10 @@ class IdAllocator(object): self.lock.acquire() try: id_ = self.next_id - self.next_id += 1000 - return id_, id_ + 1000 + self.next_id += self.BLOCK_SIZE + end_id = id_ + self.BLOCK_SIZE + LOG.debug('%r: allocating (%d..%d]', self, id_, end_id) + return id_, end_id finally: self.lock.release() @@ -771,9 +762,6 @@ class IdAllocator(object): requestee = self.router.context_by_id(msg.src_id) allocated = self.router.context_by_id(id_, msg.src_id) - LOG.debug('%r: allocating [%r..%r) to %r', self, allocated, requestee) + LOG.debug('%r: allocating [%r..%r) to %r', + self, allocated, requestee, msg.src_id) msg.reply((id_, last_id)) - - LOG.debug('%r: publishing route to %r via %r', self, - allocated, requestee) - self.router.propagate_route(allocated, requestee) diff --git a/mitogen/parent.py b/mitogen/parent.py index 2b458ffb..f9f800a1 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -232,9 +232,10 @@ def discard_until(fd, s, deadline): def upgrade_router(econtext): if not isinstance(econtext.router, Router): # TODO econtext.router.__class__ = Router # TODO - econtext.router.id_allocator = ChildIdAllocator(econtext.router) - LOG.debug('_proxy_connect(): constructing ModuleForwarder') - ModuleForwarder(econtext.router, econtext.parent, econtext.importer) + econtext.router.upgrade( + importer=econtext.importer, + parent=econtext.parent, + ) def _docker_method(): @@ -262,15 +263,14 @@ METHOD_NAMES = { @mitogen.core.takes_econtext -def _proxy_connect(name, context_id, method_name, kwargs, econtext): +def _proxy_connect(name, method_name, kwargs, econtext): mitogen.parent.upgrade_router(econtext) context = econtext.router._connect( - context_id, - METHOD_NAMES[method_name](), + klass=METHOD_NAMES[method_name](), name=name, **kwargs ) - return context.name + return context.context_id, context.name class Stream(mitogen.core.Stream): @@ -296,6 +296,9 @@ class Stream(mitogen.core.Stream): def __init__(self, *args, **kwargs): super(Stream, self).__init__(*args, **kwargs) self.sent_modules = set(['mitogen', 'mitogen.core']) + #: List of contexts reachable via this stream; used to cleanup routes + #: during disconnection. + self.routes = set([self.remote_id]) def construct(self, remote_name=None, python_path=None, debug=False, connect_timeout=None, profiling=False, **kwargs): @@ -444,9 +447,130 @@ class ChildIdAllocator(object): return self.allocate() +class RouteMonitor(object): + def __init__(self, router, parent=None): + self.router = router + self.parent = parent + self.router.add_handler( + fn=self._on_add_route, + handle=mitogen.core.ADD_ROUTE, + persist=True, + ) + self.router.add_handler( + fn=self._on_del_route, + handle=mitogen.core.DEL_ROUTE, + persist=True, + ) + + def propagate(self, handle, target_id): + # self.parent is None in the master. + if self.parent: + self.parent.send( + mitogen.core.Message( + handle=handle, + data=str(target_id), + ) + ) + + def notice_stream(self, stream): + """ + When this parent is responsible for a new directly connected child + stream, we're also responsible for broadcasting DEL_ROUTE upstream + if/when that child disconnects. + """ + self.propagate(mitogen.core.ADD_ROUTE, stream.remote_id) + mitogen.core.listen( + obj=stream, + name='disconnect', + func=lambda: self._on_stream_disconnect(stream), + ) + + def _on_stream_disconnect(self, stream): + """ + Respond to disconnection of a local stream by + """ + for target_id in stream.routes: + LOG.debug('%r is gone; propagating DEL_ROUTE for ID %d', + stream, target_id) + self.router.del_route(target_id) + self.propagate(mitogen.core.DEL_ROUTE, target_id) + + def _on_add_route(self, msg): + if msg == mitogen.core._DEAD: + return + + target_id = int(msg.data) + stream = self.router.stream_by_id(msg.auth_id) + current = self.router.stream_by_id(target_id) + if current and current.remote_id != mitogen.parent_id: + LOG.error('Cannot add duplicate route to %r via %r, ' + 'already have existing route via %r', + target_id, stream, current) + return + + LOG.debug('Adding route to %d via %r', target_id, stream) + stream.routes.add(target_id) + self.router.add_route(target_id, stream) + self.propagate(mitogen.core.ADD_ROUTE, target_id) + + def _on_del_route(self, msg): + if msg == mitogen.core._DEAD: + return + + target_id = int(msg.data) + registered_stream = self.router.stream_by_id(target_id) + stream = self.router.stream_by_id(msg.auth_id) + if registered_stream != stream: + LOG.error('Received DEL_ROUTE for %d from %r, expected %r', + target_id, stream, registered_stream) + return + + LOG.debug('Deleting route to %d via %r', target_id, stream) + stream.routes.discard(target_id) + self.router.del_route(target_id) + self.propagate(mitogen.core.DEL_ROUTE, target_id) + + class Router(mitogen.core.Router): context_class = mitogen.core.Context + id_allocator = None + responder = None + log_forwarder = None + route_monitor = None + + def upgrade(self, importer, parent): + LOG.debug('%r.upgrade()', self) + self.id_allocator = ChildIdAllocator(router=self) + self.responder = ModuleForwarder( + router=self, + parent_context=parent, + importer=importer, + ) + self.route_monitor = RouteMonitor(self, parent) + + def stream_by_id(self, dst_id): + return self._stream_by_id.get(dst_id, + self._stream_by_id.get(mitogen.parent_id)) + + def add_route(self, target_id, stream): + LOG.debug('%r.add_route(%r, %r)', self, target_id, stream) + assert isinstance(target_id, int) + assert isinstance(stream, Stream) + try: + self._stream_by_id[target_id] = stream + except KeyError: + LOG.error('%r: cant add route to %r via %r: no such stream', + self, target_id, stream) + + def del_route(self, target_id): + LOG.debug('%r.del_route(%r)', self, target_id) + try: + del self._stream_by_id[target_id] + except KeyError: + LOG.error('%r: cant delete route to %r: no such stream', + self, target_id) + def get_module_blacklist(self): if mitogen.context_id == 0: return self.responder.blacklist @@ -469,13 +593,16 @@ class Router(mitogen.core.Router): self._context_by_id[context_id] = context return context - def _connect(self, context_id, klass, name=None, **kwargs): + def _connect(self, klass, name=None, **kwargs): + context_id = self.allocate_id() context = self.context_class(self, context_id) - stream = klass(self, context.context_id, **kwargs) + stream = klass(self, context_id, **kwargs) if name is not None: stream.name = name stream.connect() context.name = stream.name + self.route_monitor.notice_stream(stream) + self.route_monitor.propagate(mitogen.core.ADD_ROUTE, context_id) self.register(context, stream) return context @@ -487,23 +614,19 @@ class Router(mitogen.core.Router): via = kwargs.pop('via', None) if via is not None: return self.proxy_connect(via, method_name, name=name, **kwargs) - context_id = self.allocate_id() - return self._connect(context_id, klass, name=name, **kwargs) + return self._connect(klass, name=name, **kwargs) def proxy_connect(self, via_context, method_name, name=None, **kwargs): - context_id = self.allocate_id() - # Must be added prior to _proxy_connect() to avoid a race. - self.add_route(context_id, via_context.context_id) - name = via_context.call(_proxy_connect, - name, context_id, method_name, kwargs + context_id, name = via_context.call(_proxy_connect, + name=name, + method_name=method_name, + kwargs=kwargs ) name = '%s.%s' % (via_context.name, name) context = self.context_class(self, context_id, name=name) context.via = via_context self._context_by_id[context.context_id] = context - - self.propagate_route(context, via_context) return context