From 69e5902e61235d3ee18bd0ea1c462ee68e50b2b2 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 19:16:41 +0100 Subject: [PATCH] issue #212: support explicit acknowledgements in FileService. --- ansible_mitogen/services.py | 230 ++++++++++++++++-------------------- ansible_mitogen/target.py | 8 ++ 2 files changed, 112 insertions(+), 126 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 13b8d044..e49b5b69 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -354,6 +354,17 @@ class ContextService(mitogen.service.Service): return result +class StreamState(object): + def __init__(self): + #: List of [(Sender, file object)] + self.jobs = [] + self.completing = {} + #: In-flight byte count. + self.unacked = 0 + #: Lock. + self.lock = threading.Lock() + + class FileService(mitogen.service.Service): """ Streaming file server, used to serve both small files like Ansible module @@ -420,131 +431,20 @@ class FileService(mitogen.service.Service): handle = 501 max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' + context_mismatch_msg = 'sender= kwarg context must match requestee context' - #: Maximum size of any stream's output queue before we temporarily stop - #: pumping more file chunks on that stream. The queue may overspill by up - #: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). + #: Maximum size of any stream's output queue before we stop pumping more + #: file chunks. The queue may overspill by up to mitogen.core.CHUNK_SIZE-1 + #: bytes (128KiB-1). With max_queue_size=1MiB and a RTT of 10ms, maximum + #: throughput is 112MiB/sec, which is >5x what SSH can handle on my laptop. max_queue_size = 1048576 - #: Time spent by the scheduler thread asleep when it has no more data to - #: pump, but while at least one transfer remains active. With - #: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any - #: single stream is 112MiB/sec, which is >5x what SSH can handle on my - #: laptop. - sleep_delay_secs = 0.01 - def __init__(self, router): super(FileService, self).__init__(router) #: Mapping of registered path -> file size. self._metadata_by_path = {} - #: Queue used to communicate from service to scheduler thread. - self._queue = mitogen.core.Latch() - #: Mapping of Stream->[(Sender, file object)]. - self._pending_by_stream = {} - self._thread = threading.Thread(target=self._scheduler_main) - self._thread.start() - - def on_shutdown(self): - """ - Respond to shutdown of the service pool by marking our queue closed. - This causes :meth:`_sleep_on_queue` to wake immediately and return - :data:`False`, causing the scheduler main thread to exit. - """ - self._queue.close() - - def _pending_bytes(self, stream): - """ - Defer a function call to the Broker thread in order to accurately - measure the bytes pending in `stream`'s queue. - - This must be done synchronized with the Broker, as OS scheduler - uncertainty could cause Sender.send()'s deferred enqueues to be - processed very late, making the output queue look much emptier than it - really is (or is about to become). - """ - latch = mitogen.core.Latch() - self.router.broker.defer(lambda: latch.put(stream.pending_bytes())) - return latch.get() - - def _schedule_pending(self, stream, pending): - """ - Consider the pending file transfers for a single stream, pumping new - file chunks into the stream's queue while its size is below the - configured limit. - - :param mitogen.core.Stream stream: - Stream to pump chunks for. - :param pending: - Corresponding list from :attr:`_pending_by_stream`. - """ - while pending and self._pending_bytes(stream) < self.max_queue_size: - sender, fp = pending[0] - s = fp.read(mitogen.core.CHUNK_SIZE) - if s: - sender.send(s) - continue - - # Empty read, indicating this file is fully transferred. Mark the - # sender closed (causing the corresponding Receiver loop in the - # target to exit), close the file handle, remove our entry from the - # pending list, and delete the stream's entry in the pending map if - # no more sends remain. - sender.close() - fp.close() - pending.pop(0) - if not pending: - del self._pending_by_stream[stream] - - def _sleep_on_queue(self): - """ - Sleep indefinitely (no active transfers) or for - :attr:`sleep_delay_secs` (active transfers) waiting for a new transfer - request to arrive from the :meth:`fetch` method. - - If a new request arrives, add it to the appropriate list in - :attr:`_pending_by_stream`. - - :returns: - :data:`True` the scheduler's queue is still open, - :meth:`on_shutdown` hasn't been called yet, otherwise - :data:`False`. - """ - if self._pending_by_stream: - timeout = self.sleep_delay_secs - else: - timeout = None - - try: - sender, fp = self._queue.get(timeout=timeout) - except mitogen.core.LatchError: - return False - except mitogen.core.TimeoutError: - return True - - LOG.debug('%r._sleep_on_queue(): setting up %r for %r', - self, fp.name, sender) - stream = self.router.stream_by_id(sender.context.context_id) - pending = self._pending_by_stream.setdefault(stream, []) - pending.append((sender, fp)) - return True - - def _scheduler_main(self): - """ - Scheduler thread's main function. Sleep until - :meth:`_sleep_on_queue` indicates the queue has been shut down, - pumping pending file chunks each time we wake. - """ - while self._sleep_on_queue(): - for stream, pending in list(self._pending_by_stream.items()): - self._schedule_pending(stream, pending) - - # on_shutdown() has been called. Send close() on every sender to give - # targets a chance to shut down gracefully. - LOG.debug('%r._scheduler_main() shutting down', self) - for _, pending in self._pending_by_stream.items(): - for sender, fp in pending: - sender.close() - fp.close() + #: Mapping of Stream->StreamState. + self._state_by_stream = {} def _name_or_none(self, func, n, attr): try: @@ -558,8 +458,8 @@ class FileService(mitogen.service.Service): }) def register(self, path): """ - Authorize a path for access by child contexts. Calling this repeatedly - with the same path is harmless. + Authorize a path for access by children. Repeat calls with the same + path is harmless. :param str path: File path. @@ -581,12 +481,51 @@ class FileService(mitogen.service.Service): 'atime': st.st_atime, } + def on_shutdown(self): + """ + Respond to shutdown by sending close() to every target, allowing their + receive loop to exit and clean up gracefully. + """ + LOG.debug('%r.on_shutdown()', self) + for stream, state in self._state_by_stream.items(): + state.lock.acquire() + try: + for sender, fp in reversed(state.jobs): + sender.close() + fp.close() + state.jobs.pop() + finally: + state.lock.release() + + def _schedule_pending_unlocked(self, state): + """ + Consider the pending transfers for a stream, pumping new chunks while + the unacknowledged byte count is below :attr:`max_queue_size`. Must be + called with the StreamState lock held. + + :param StreamState state: + Stream to schedule chunks for. + """ + while state.jobs and state.unacked < self.max_queue_size: + sender, fp = state.jobs[0] + s = fp.read(mitogen.core.CHUNK_SIZE) + state.unacked += len(s) + sender.send(s) + + if not s: + # File is done. Cause the target's receive loop to exit by + # closing the sender, close the file, and remove the job entry. + sender.close() + fp.close() + state.jobs.pop(0) + @mitogen.service.expose(policy=mitogen.service.AllowAny()) + @mitogen.service.no_reply() @mitogen.service.arg_spec({ 'path': basestring, 'sender': mitogen.core.Sender, }) - def fetch(self, path, sender): + def fetch(self, path, sender, msg): """ Fetch a file's data. @@ -603,13 +542,52 @@ class FileService(mitogen.service.Service): * ``group``: Owner group name on host machine. * ``mtime``: Floating point modification time. * ``ctime``: Floating point change time. - :raises mitogen.core.CallError: - The path was not registered. + :raises Error: + Unregistered path, or attempt to send to context that was not the + requestee context. """ if path not in self._metadata_by_path: - raise mitogen.core.CallError(self.unregistered_msg) + raise Error(self.unregistered_msg) + if msg.src_id != sender.context.context_id: + raise Error(self.context_mismatch_msg) LOG.debug('Serving %r', path) fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) - self._queue.put((sender, fp)) - return self._metadata_by_path[path] + # Response must arrive first so requestee can begin receive loop, + # otherwise first ack won't arrive until all pending chunks were + # delivered. In that case max BDP would always be 128KiB, aka. max + # ~10Mbit/sec over a 100ms link. + msg.reply(self._metadata_by_path[path]) + + stream = self.router.stream_by_id(sender.context.context_id) + state = self._state_by_stream.setdefault(stream, StreamState()) + state.lock.acquire() + try: + state.jobs.append((sender, fp)) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() + + @mitogen.service.expose(policy=mitogen.service.AllowAny()) + @mitogen.service.no_reply() + @mitogen.service.arg_spec({ + 'size': int, + }) + @mitogen.service.no_reply() + def acknowledge(self, size, msg): + """ + Acknowledgement bytes received by a transfer target, scheduling new + chunks to keep the window full. This should be called for every chunk + received by the target. + """ + stream = self.router.stream_by_id(msg.src_id) + state = self._state_by_stream[stream] + state.lock.acquire() + try: + if state.unacked < size: + LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', + self, msg.src_id, state.unacked, size) + state.unacked -= min(state.unacked, size) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 425ed4bf..3020595e 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -98,6 +98,14 @@ def _get_file(context, path, out_fp): for chunk in recv: s = chunk.unpickle() LOG.debug('_get_file(%r): received %d bytes', path, len(s)) + mitogen.service.call_async( + context=context, + handle=ansible_mitogen.services.FileService.handle, + method='acknowledge', + kwargs={ + 'size': len(s), + } + ) out_fp.write(s) ok = out_fp.tell() == metadata['size']