From b0309b539c5b74911296ca75f77ccb4aa78dcb40 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 09:53:16 +0100 Subject: [PATCH 1/8] ansible: disable interpreter recycling for connections. Must explicitly specify enable_lru=True in ContextService.get() to trigger recycling. --- ansible_mitogen/connection.py | 1 + ansible_mitogen/services.py | 4 ++-- docs/ansible.rst | 4 ---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 192c6dd4..59e82695 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -134,6 +134,7 @@ def _connect_setns(spec): def _connect_sudo(spec): return { 'method': 'sudo', + 'enable_lru': True, 'kwargs': { 'username': spec['become_user'], 'password': spec['become_pass'], diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index c3157bf0..13b8d044 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -167,7 +167,7 @@ class ContextService(mitogen.service.Service): del self._response_by_key[key] del self._refs_by_context[context] del self._key_by_context[context] - if lru: + if lru and context in lru: lru.remove(context) if new_context: lru.append(new_context) @@ -256,7 +256,7 @@ class ContextService(mitogen.service.Service): raise Error('unsupported method: %(transport)s' % spec) context = method(via=via, **spec['kwargs']) - if via: + if via and spec.get('enable_lru'): self._update_lru(context, spec, via) else: # For directly connected contexts, listen to the associated diff --git a/docs/ansible.rst b/docs/ansible.rst index 3abf6f9c..d28c0de0 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -213,10 +213,6 @@ container. as duplicate connections between hops, due to not perfectly replicating the configuration Ansible would normally use for the intermediary. - * The extension does not understand the difference between a delegated - connection and a ``become_user``. If interpreter recycling kicks in, a - delegated connection could be prematurely recycled. - To enable connection delegation, set ``mitogen_via=`` on the command line, or as host and group variables. From ff7fb00569155bc53b00b65682c3812680c46ff9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 10:11:13 +0100 Subject: [PATCH 2/8] parent: return latch to wait() caller to allow graceful timeout --- docs/api.rst | 5 +++++ mitogen/parent.py | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/api.rst b/docs/api.rst index 37640464..17b1b38d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -917,6 +917,11 @@ Context Class :param bool wait: If :py:data:`True`, block the calling thread until the context has completely terminated. + :returns: + If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch` + whose :meth:`get() ` method returns + :data:`None` when shutdown completes. The `timeout` parameter may + be used to implement graceful timeouts. .. method:: call_async (fn, \*args, \*\*kwargs) diff --git a/mitogen/parent.py b/mitogen/parent.py index 497ccd0a..2203ef9c 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -806,7 +806,8 @@ class Context(mitogen.core.Context): if wait: latch.get() - return latch + else: + return latch class RouteMonitor(object): From bf6c2fa97c771ce4e2105a5ab444a9f8a3105ecc Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 19:14:27 +0100 Subject: [PATCH 3/8] issue #212: service: more concise repr --- mitogen/service.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/mitogen/service.py b/mitogen/service.py index 3351728e..efab09cd 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -138,10 +138,7 @@ class Service(object): self.running = True def __repr__(self): - return '%s.%s()' % ( - self.__class__.__module__, - self.__class__.__name__, - ) + return '%s()' % (self.__class__.__name__,) def on_shutdown(self): """ From afe983d6c907097c7fafab2ae8dac4bf8da5f775 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 19:14:40 +0100 Subject: [PATCH 4/8] issue #212: service: support no_reply decorator. --- mitogen/service.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/mitogen/service.py b/mitogen/service.py index efab09cd..69774299 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -120,6 +120,18 @@ def expose(policy): return wrapper +def no_reply(): + """ + Annotate a method as one that does not generate a response. Messages sent + by the method are done so explicitly. This can be used for fire-and-forget + endpoints where the requestee never receives a reply. + """ + def wrapper(func): + func.mitogen_service__no_reply = True + return func + return wrapper + + class Service(object): #: Sentinel object to suppress reply generation, since returning ``None`` #: will trigger a response message containing the pickled ``None``. @@ -179,7 +191,20 @@ class Service(object): method = getattr(self, method_name) if 'msg' in method.func_code.co_varnames: kwargs['msg'] = msg # TODO: hack - return method(**kwargs) + + no_reply = getattr(method, 'mitogen_service__no_reply', False) + ret = None + try: + ret = method(**kwargs) + if no_reply: + return self.NO_REPLY + return ret + except Exception: + if no_reply: + LOG.exception('While calling no-reply method %s.%s', + type(self).__name__, method.func_name) + else: + raise def on_receive_message(self, msg): try: From 5e6e56f0c51310673c3f0f032fa678beff078b5a Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 19:14:56 +0100 Subject: [PATCH 5/8] issue #212: service: make call_async kwargs optional. --- mitogen/service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mitogen/service.py b/mitogen/service.py index 69774299..53c5ff46 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -369,10 +369,10 @@ class Pool(object): ) -def call_async(context, handle, method, kwargs): +def call_async(context, handle, method, kwargs=None): LOG.debug('service.call_async(%r, %r, %r, %r)', context, handle, method, kwargs) - pair = (method, kwargs) + pair = (method, kwargs or {}) msg = mitogen.core.Message.pickled(pair, handle=handle) return context.send_async(msg) From 69e5902e61235d3ee18bd0ea1c462ee68e50b2b2 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 19:16:41 +0100 Subject: [PATCH 6/8] issue #212: support explicit acknowledgements in FileService. --- ansible_mitogen/services.py | 230 ++++++++++++++++-------------------- ansible_mitogen/target.py | 8 ++ 2 files changed, 112 insertions(+), 126 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 13b8d044..e49b5b69 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -354,6 +354,17 @@ class ContextService(mitogen.service.Service): return result +class StreamState(object): + def __init__(self): + #: List of [(Sender, file object)] + self.jobs = [] + self.completing = {} + #: In-flight byte count. + self.unacked = 0 + #: Lock. + self.lock = threading.Lock() + + class FileService(mitogen.service.Service): """ Streaming file server, used to serve both small files like Ansible module @@ -420,131 +431,20 @@ class FileService(mitogen.service.Service): handle = 501 max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' + context_mismatch_msg = 'sender= kwarg context must match requestee context' - #: Maximum size of any stream's output queue before we temporarily stop - #: pumping more file chunks on that stream. The queue may overspill by up - #: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). + #: Maximum size of any stream's output queue before we stop pumping more + #: file chunks. The queue may overspill by up to mitogen.core.CHUNK_SIZE-1 + #: bytes (128KiB-1). With max_queue_size=1MiB and a RTT of 10ms, maximum + #: throughput is 112MiB/sec, which is >5x what SSH can handle on my laptop. max_queue_size = 1048576 - #: Time spent by the scheduler thread asleep when it has no more data to - #: pump, but while at least one transfer remains active. With - #: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any - #: single stream is 112MiB/sec, which is >5x what SSH can handle on my - #: laptop. - sleep_delay_secs = 0.01 - def __init__(self, router): super(FileService, self).__init__(router) #: Mapping of registered path -> file size. self._metadata_by_path = {} - #: Queue used to communicate from service to scheduler thread. - self._queue = mitogen.core.Latch() - #: Mapping of Stream->[(Sender, file object)]. - self._pending_by_stream = {} - self._thread = threading.Thread(target=self._scheduler_main) - self._thread.start() - - def on_shutdown(self): - """ - Respond to shutdown of the service pool by marking our queue closed. - This causes :meth:`_sleep_on_queue` to wake immediately and return - :data:`False`, causing the scheduler main thread to exit. - """ - self._queue.close() - - def _pending_bytes(self, stream): - """ - Defer a function call to the Broker thread in order to accurately - measure the bytes pending in `stream`'s queue. - - This must be done synchronized with the Broker, as OS scheduler - uncertainty could cause Sender.send()'s deferred enqueues to be - processed very late, making the output queue look much emptier than it - really is (or is about to become). - """ - latch = mitogen.core.Latch() - self.router.broker.defer(lambda: latch.put(stream.pending_bytes())) - return latch.get() - - def _schedule_pending(self, stream, pending): - """ - Consider the pending file transfers for a single stream, pumping new - file chunks into the stream's queue while its size is below the - configured limit. - - :param mitogen.core.Stream stream: - Stream to pump chunks for. - :param pending: - Corresponding list from :attr:`_pending_by_stream`. - """ - while pending and self._pending_bytes(stream) < self.max_queue_size: - sender, fp = pending[0] - s = fp.read(mitogen.core.CHUNK_SIZE) - if s: - sender.send(s) - continue - - # Empty read, indicating this file is fully transferred. Mark the - # sender closed (causing the corresponding Receiver loop in the - # target to exit), close the file handle, remove our entry from the - # pending list, and delete the stream's entry in the pending map if - # no more sends remain. - sender.close() - fp.close() - pending.pop(0) - if not pending: - del self._pending_by_stream[stream] - - def _sleep_on_queue(self): - """ - Sleep indefinitely (no active transfers) or for - :attr:`sleep_delay_secs` (active transfers) waiting for a new transfer - request to arrive from the :meth:`fetch` method. - - If a new request arrives, add it to the appropriate list in - :attr:`_pending_by_stream`. - - :returns: - :data:`True` the scheduler's queue is still open, - :meth:`on_shutdown` hasn't been called yet, otherwise - :data:`False`. - """ - if self._pending_by_stream: - timeout = self.sleep_delay_secs - else: - timeout = None - - try: - sender, fp = self._queue.get(timeout=timeout) - except mitogen.core.LatchError: - return False - except mitogen.core.TimeoutError: - return True - - LOG.debug('%r._sleep_on_queue(): setting up %r for %r', - self, fp.name, sender) - stream = self.router.stream_by_id(sender.context.context_id) - pending = self._pending_by_stream.setdefault(stream, []) - pending.append((sender, fp)) - return True - - def _scheduler_main(self): - """ - Scheduler thread's main function. Sleep until - :meth:`_sleep_on_queue` indicates the queue has been shut down, - pumping pending file chunks each time we wake. - """ - while self._sleep_on_queue(): - for stream, pending in list(self._pending_by_stream.items()): - self._schedule_pending(stream, pending) - - # on_shutdown() has been called. Send close() on every sender to give - # targets a chance to shut down gracefully. - LOG.debug('%r._scheduler_main() shutting down', self) - for _, pending in self._pending_by_stream.items(): - for sender, fp in pending: - sender.close() - fp.close() + #: Mapping of Stream->StreamState. + self._state_by_stream = {} def _name_or_none(self, func, n, attr): try: @@ -558,8 +458,8 @@ class FileService(mitogen.service.Service): }) def register(self, path): """ - Authorize a path for access by child contexts. Calling this repeatedly - with the same path is harmless. + Authorize a path for access by children. Repeat calls with the same + path is harmless. :param str path: File path. @@ -581,12 +481,51 @@ class FileService(mitogen.service.Service): 'atime': st.st_atime, } + def on_shutdown(self): + """ + Respond to shutdown by sending close() to every target, allowing their + receive loop to exit and clean up gracefully. + """ + LOG.debug('%r.on_shutdown()', self) + for stream, state in self._state_by_stream.items(): + state.lock.acquire() + try: + for sender, fp in reversed(state.jobs): + sender.close() + fp.close() + state.jobs.pop() + finally: + state.lock.release() + + def _schedule_pending_unlocked(self, state): + """ + Consider the pending transfers for a stream, pumping new chunks while + the unacknowledged byte count is below :attr:`max_queue_size`. Must be + called with the StreamState lock held. + + :param StreamState state: + Stream to schedule chunks for. + """ + while state.jobs and state.unacked < self.max_queue_size: + sender, fp = state.jobs[0] + s = fp.read(mitogen.core.CHUNK_SIZE) + state.unacked += len(s) + sender.send(s) + + if not s: + # File is done. Cause the target's receive loop to exit by + # closing the sender, close the file, and remove the job entry. + sender.close() + fp.close() + state.jobs.pop(0) + @mitogen.service.expose(policy=mitogen.service.AllowAny()) + @mitogen.service.no_reply() @mitogen.service.arg_spec({ 'path': basestring, 'sender': mitogen.core.Sender, }) - def fetch(self, path, sender): + def fetch(self, path, sender, msg): """ Fetch a file's data. @@ -603,13 +542,52 @@ class FileService(mitogen.service.Service): * ``group``: Owner group name on host machine. * ``mtime``: Floating point modification time. * ``ctime``: Floating point change time. - :raises mitogen.core.CallError: - The path was not registered. + :raises Error: + Unregistered path, or attempt to send to context that was not the + requestee context. """ if path not in self._metadata_by_path: - raise mitogen.core.CallError(self.unregistered_msg) + raise Error(self.unregistered_msg) + if msg.src_id != sender.context.context_id: + raise Error(self.context_mismatch_msg) LOG.debug('Serving %r', path) fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) - self._queue.put((sender, fp)) - return self._metadata_by_path[path] + # Response must arrive first so requestee can begin receive loop, + # otherwise first ack won't arrive until all pending chunks were + # delivered. In that case max BDP would always be 128KiB, aka. max + # ~10Mbit/sec over a 100ms link. + msg.reply(self._metadata_by_path[path]) + + stream = self.router.stream_by_id(sender.context.context_id) + state = self._state_by_stream.setdefault(stream, StreamState()) + state.lock.acquire() + try: + state.jobs.append((sender, fp)) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() + + @mitogen.service.expose(policy=mitogen.service.AllowAny()) + @mitogen.service.no_reply() + @mitogen.service.arg_spec({ + 'size': int, + }) + @mitogen.service.no_reply() + def acknowledge(self, size, msg): + """ + Acknowledgement bytes received by a transfer target, scheduling new + chunks to keep the window full. This should be called for every chunk + received by the target. + """ + stream = self.router.stream_by_id(msg.src_id) + state = self._state_by_stream[stream] + state.lock.acquire() + try: + if state.unacked < size: + LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', + self, msg.src_id, state.unacked, size) + state.unacked -= min(state.unacked, size) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 425ed4bf..3020595e 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -98,6 +98,14 @@ def _get_file(context, path, out_fp): for chunk in recv: s = chunk.unpickle() LOG.debug('_get_file(%r): received %d bytes', path, len(s)) + mitogen.service.call_async( + context=context, + handle=ansible_mitogen.services.FileService.handle, + method='acknowledge', + kwargs={ + 'size': len(s), + } + ) out_fp.write(s) ok = out_fp.tell() == metadata['size'] From 58eb9828b0a663e721e5db94748e4a3a7fb158e0 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 19:35:43 +0100 Subject: [PATCH 7/8] docs: remove limitation. --- docs/ansible.rst | 4 ---- 1 file changed, 4 deletions(-) diff --git a/docs/ansible.rst b/docs/ansible.rst index d28c0de0..aad5966c 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -205,10 +205,6 @@ container. * Delegated connection setup is single-threaded; only one connection can be constructed in parallel per intermediary. - * Unbounded queue RAM growth may occur in an intermediary during large file - transfers if the link between any two hops is slower than the link - between the controller and the first hop. - * Inferring the configuration of intermediaries may be buggy, manifesting as duplicate connections between hops, due to not perfectly replicating the configuration Ansible would normally use for the intermediary. From f5d22a3ca19bd8cfb2aec09aa2d1f0c2d5162149 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 19:41:23 +0100 Subject: [PATCH 8/8] core: support deleting handlers, make Receiver.close() unregister --- ansible_mitogen/target.py | 2 +- docs/api.rst | 7 +++++++ mitogen/core.py | 6 ++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 3020595e..b42ecf31 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -105,7 +105,7 @@ def _get_file(context, path, out_fp): kwargs={ 'size': len(s), } - ) + ).close() out_fp.write(s) ok = out_fp.tell() == metadata['size'] diff --git a/docs/api.rst b/docs/api.rst index 17b1b38d..923c368d 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -470,6 +470,13 @@ Router Class :return: `handle`, or if `handle` was ``None``, the newly allocated handle. + .. method:: del_handler (handle) + + Remove the handle registered for `handle` + + :raises KeyError: + The handle wasn't registered. + .. method:: _async_route(msg, stream=None) Arrange for `msg` to be forwarded towards its destination. If its diff --git a/mitogen/core.py b/mitogen/core.py index 2e43ed65..d87baf36 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -436,6 +436,9 @@ class Receiver(object): self.notify(self) def close(self): + if self.handle: + self.router.del_handler(self.handle) + self.handle = None self._latch.put(Message.dead()) def empty(self): @@ -1273,6 +1276,9 @@ class Router(object): self.broker.start_receive(stream) listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream)) + def del_handler(self, handle): + del self._handle_map[handle] + def add_handler(self, fn, handle=None, persist=True, policy=None, respondent=None): handle = handle or self._last_handle.next()