From 29087018c7ba0c9a2458cad9f785595e8a6211fb Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 02:40:05 +0100 Subject: [PATCH] ansible: implement streaming in FileService. This commit only uses it for the target.get_file() helper, which is only used for transferring modules. The next commit wires it into the Connection.transfer_file() API, which is the method the copy module uses. --- ansible_mitogen/services.py | 138 +++++++++++++++++++++++++++++++++--- ansible_mitogen/target.py | 45 ++++++++---- 2 files changed, 159 insertions(+), 24 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 3665e475..2a647f9a 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -359,9 +359,120 @@ class FileService(mitogen.service.Service): max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' + #: Maximum size of any stream's output queue before we temporarily stop + #: pumping more file chunks. The queue may overspill by up to + #: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). + max_queue_size = 1048576 + + #: Time spent by the scheduler thread asleep when it has no more queues to + #: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput + #: on any single stream is 100MiB/sec, which is 5x what SSH can handle on + #: my laptop. + sleep_delay = 0.01 + def __init__(self, router): super(FileService, self).__init__(router) - self._paths = {} + #: Mapping of registered path -> file size. + self._size_by_path = {} + #: Queue used to communicate from service to scheduler thread. + self._queue = mitogen.core.Latch() + #: Mapping of Stream->[(sender, fp)]. + self._pending_by_stream = {} + self._thread = threading.Thread(target=self._scheduler_main) + self._thread.start() + + def _pending_bytes(self, stream): + """ + Defer a function call to the Broker thread in order to accurately + measure the bytes pending in `stream`'s queue. + + This must be done synchronized with the Broker, as scheduler + uncertainty could cause Sender.send()'s deferred enqueues to be + processed very late, making the output queue look much emptier than it + really is (or is about to become). + """ + latch = mitogen.core.Latch() + self.router.broker.defer(lambda: latch.put(stream.pending_bytes())) + return latch.get() + + def _schedule_pending(self, stream, pending): + """ + Consider the pending file transfers for a single stream, pumping new + file chunks into the stream's queue while its size is below the + configured limit. + + :param mitogen.core.Stream stream: + Stream to pump chunks for. + :param pending: + Corresponding list from :attr:`_pending_by_stream`. + """ + while pending and self._pending_bytes(stream) < self.max_queue_size: + sender, fp = pending[0] + s = fp.read(mitogen.core.CHUNK_SIZE) + if s: + sender.send(s) + continue + + # Empty read, indicating this file is fully transferred. Mark the + # sender closed (causing the corresponding Receiver loop in the + # target to exit), close the file handle, remove our entry from the + # pending list, and delete the stream's entry in the pending map if + # no more sends remain. + sender.close() + fp.close() + pending.pop(0) + if not pending: + del self._pending_by_stream[stream] + + def _sleep_on_queue(self): + """ + Sleep indefinitely (no active transfers) or for :attr:`sleep_delay_ms` + (active transfers) waiting for a new transfer request to arrive from + the :meth:`fetch` method. + + If a new request arrives, add it to the appropriate list in + :attr:`_pending_by_stream`. + + :returns: + :data:`True` the scheduler's queue is still open, + :meth:`on_shutdown` hasn't been called yet, otherwise + :data:`False`. + """ + try: + if self._schedule_pending: + timeout = self.sleep_delay_ms + else: + timeout = None + sender, fp = self._queue.get(timeout=timeout) + except mitogen.core.LatchError: + return False + except mitogen.core.TimeoutError: + return True + + LOG.debug('%r._sleep_on_queue(): setting up %r for %r', + self, fp.name, sender) + stream = self.router.stream_by_id(sender.context.context_id) + pending = self._pending_by_stream.setdefault(stream, []) + pending.append((sender, fp)) + return True + + def _scheduler_main(self): + """ + Scheduler thread's main function. Sleep until + :meth:`_sleep_on_queue` indicates the queue has been shut down, + pending pending file chunks each time we wake. + """ + while self._sleep_on_queue(): + for stream, pending in list(self._pending_by_stream.items()): + self._schedule_pending(stream, pending) + + # on_shutdown() has been called. Send close() on every sender to give + # targets a chance to shut down gracefully. + LOG.debug('%r._scheduler_main() shutting down', self) + for _, pending in self._pending_by_stream.items(): + for sender, fp in pending: + sender.close() + fp.close() @mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.arg_spec({ @@ -375,30 +486,35 @@ class FileService(mitogen.service.Service): :param str path: File path. """ - if path not in self._paths: + if path not in self._size_by_path: LOG.debug('%r: registering %r', self, path) - with open(path, 'rb') as fp: - self._paths[path] = zlib.compress(fp.read()) + self._size_by_path[path] = os.path.getsize(path) @mitogen.service.expose(policy=mitogen.service.AllowAny()) @mitogen.service.arg_spec({ - 'path': basestring + 'path': basestring, + 'sender': mitogen.core.Sender, }) - def fetch(self, path): + def fetch(self, path, sender): """ Fetch a file's data. :param str path: File path. - + :param mitogen.core.Sender sender: + Sender to receive file data. :returns: - The file data. - + File size. The target can decide whether to keep the file in RAM or + disk based on the return value. :raises mitogen.core.CallError: The path was not registered. """ - if path not in self._paths: + if path not in self._size_by_path: raise mitogen.core.CallError(self.unregistered_msg) LOG.debug('Serving %r', path) - return self._paths[path] + self._queue.put(( + sender, + open(path, 'rb', mitogen.core.CHUNK_SIZE), + )) + return self._size_by_path[path] diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index ec05ab55..4edc0b60 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -32,6 +32,7 @@ for file transfer, module execution and sundry bits like changing file modes. """ from __future__ import absolute_import +import cStringIO import json import logging import operator @@ -64,6 +65,34 @@ _file_cache = {} _fork_parent = None +def _get_file(context, path, out_fp): + LOG.debug('get_file(): fetching %r from %r', path, context) + recv = mitogen.core.Receiver(router=context.router) + size = mitogen.service.call( + context=context, + handle=ansible_mitogen.services.FileService.handle, + method='fetch', + kwargs={ + 'path': path, + 'sender': recv.to_sender() + } + ) + + for chunk in recv: + s = chunk.unpickle() + LOG.debug('_get_file(%r): received %d bytes', path, len(s)) + out_fp.write(s) + + if out_fp.tell() != size: + LOG.error('get_file(%r): receiver was closed early, controller ' + 'is likely shutting down. Truncating output file.', path) + out_fp.truncate(0) + + LOG.debug('target.get_file(): fetched %d bytes of %r from %r', + size, path, context) + return size + + def get_file(context, path): """ Basic in-memory caching module fetcher. This generates an one roundtrip for @@ -79,19 +108,9 @@ def get_file(context, path): Bytestring file data. """ if path not in _file_cache: - LOG.debug('target.get_file(): fetching %r from %r', path, context) - _file_cache[path] = zlib.decompress( - mitogen.service.call( - context=context, - handle=ansible_mitogen.services.FileService.handle, - method='fetch', - kwargs={ - 'path': path - } - ) - ) - LOG.debug('target.get_file(): fetched %r from %r', path, context) - + io = cStringIO.StringIO() + _get_file(context, path, io) + _file_cache[path] = io.getvalue() return _file_cache[path]