From 2a56c672ca645a62b55563f1cf5d2f1b89391325 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 29 Apr 2018 21:38:43 +0100 Subject: [PATCH] ansible: FileService docstring updates. --- ansible_mitogen/services.py | 107 ++++++++++++++++-------------------- 1 file changed, 48 insertions(+), 59 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index e49b5b69..a0c08b98 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -367,77 +367,66 @@ class StreamState(object): class FileService(mitogen.service.Service): """ - Streaming file server, used to serve both small files like Ansible module - sources, and huge files like ISO images. Paths must be explicitly added to - the service by a trusted context before they will be served to an untrusted - context. + Streaming file server, used to serve small files like Ansible modules and + huge files like ISO images. Paths must be registered by a trusted context + before they will be served to a child. - The file service nominally lives on the mitogen.service.Pool() threads - shared with ContextService above, however for simplicity it also maintains - a dedicated thread from where file chunks are scheduled. - - The scheduler thread is responsible for dividing transfer requests up among - the physical streams that connect to those contexts, and ensure each stream - never has an excessive amount of data buffered in RAM at any time. + Transfers are divided among the physical streams that connect external + contexts, ensuring each stream never has excessive data buffered in RAM, + while still maintaining enough to fully utilize available bandwidth. This + is achieved by making an initial bandwidth assumption, enqueueing enough + chunks to fill that assumed pipe, then responding to delivery + acknowledgements from the receiver by scheduling new chunks. Transfers proceeed one-at-a-time per stream. When multiple contexts exist - reachable over the same stream (e.g. one is the SSH account, another is a - sudo account, and a third is a proxied SSH connection), each request is - satisfied in turn before chunks for subsequent requests start flowing. This - ensures when a connection is contended, that preference is given to - completing individual transfers, rather than potentially aborting many - partially complete transfers, causing all the bandwidth used to be wasted. + on a stream (e.g. one is the SSH account, another is a sudo account, and a + third is a proxied SSH connection), each request is satisfied in turn + before subsequent requests start flowing. This ensures when a stream is + contended, priority is given to completing individual transfers rather than + potentially aborting many partial transfers, causing the bandwidth to be + wasted. Theory of operation: - 1. Trusted context (i.e. a WorkerProcess) calls register(), making a + 1. Trusted context (i.e. WorkerProcess) calls register(), making a file available to any untrusted context. - 2. Untrusted context creates a mitogen.core.Receiver() to receive - file chunks. It then calls fetch(path, recv.to_sender()), which sets - up the transfer. The fetch() method returns the final file size and - notifies the dedicated thread of the transfer request. - 3. The dedicated thread wakes from perpetual sleep, looks up the stream - used to communicate with the untrusted context, and begins pumping - 128KiB-sized chunks until that stream's output queue reaches a - limit (1MiB). - 4. The thread sleeps for 10ms, wakes, and pumps new chunks as necessary - to refill any drained output queue, which are being asynchronously - drained by the Stream implementation running on the Broker thread. - 5. Once the last chunk has been pumped for a single transfer, + 2. Requestee context creates a mitogen.core.Receiver() to receive + chunks, then calls fetch(path, recv.to_sender()), to set up the + transfer. + 3. fetch() replies to the call with the file's metadata, then + schedules an initial burst up to the window size limit (1MiB). + 4. Chunks begin to arrive in the requestee, which calls acknowledge() + for each 128KiB received. + 5. The acknowledge() call arrives at FileService, which scheduled a new + chunk to refill the drained window back to the size limit. + 6. When the last chunk has been pumped for a single transfer, Sender.close() is called causing the receive loop in - target.py::_get_file() to exit, and allows that code to compare the - transferred size with the total file size indicated by the return - value of the fetch() method. - 6. If the sizes mismatch, the caller is informed, which will discard - the result and log an error. - 7. Once all chunks have been pumped for all transfers, the dedicated - thread stops waking at 10ms intervals and resumes perpetual sleep. + target.py::_get_file() to exit, allowing that code to compare the + transferred size with the total file size from the metadata. + 7. If the sizes mismatch, _get_file()'s caller is informed which will + discard the result and log/raise an error. Shutdown: - 1. process.py calls service.Pool.shutdown(), which arranges for all the + 1. process.py calls service.Pool.shutdown(), which arranges for the service pool threads to exit and be joined, guranteeing no new requests can arrive, before calling Service.on_shutdown() for each registered service. - 2. FileService.on_shutdown() marks the dedicated thread's queue as - closed, causing the dedicated thread to wake immediately. It will - throw an exception that begins shutdown of the main loop. - 3. The main loop calls Sender.close() prematurely for every pending - transfer, causing any Receiver loops in the target contexts to exit - early. The file size check fails, and the partially downloaded file - is discarded, and an error is logged. - 4. Control exits the file transfer function in every target, and - graceful target shutdown can proceed normally, without the - associated thread needing to be forcefully killed. + 2. FileService.on_shutdown() walks every in-progress transfer and calls + Sender.close(), causing Receiver loops in the requestees to exit + early. The size check fails and any partially downloaded file is + discarded. + 3. Control exits _get_file() in every target, and graceful shutdown can + proceed normally, without the associated thread needing to be + forcefully killed. """ handle = 501 max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' context_mismatch_msg = 'sender= kwarg context must match requestee context' - #: Maximum size of any stream's output queue before we stop pumping more - #: file chunks. The queue may overspill by up to mitogen.core.CHUNK_SIZE-1 - #: bytes (128KiB-1). With max_queue_size=1MiB and a RTT of 10ms, maximum - #: throughput is 112MiB/sec, which is >5x what SSH can handle on my laptop. - max_queue_size = 1048576 + #: Initial burst size. With 1MiB and a RTT of 10ms, maximum throughput is + #: 112MiB/sec, which is 5x what SSH can handle on a 2011 era 2.4Ghz Core + #: i5. + window_size_bytes = 1048576 def __init__(self, router): super(FileService, self).__init__(router) @@ -500,13 +489,13 @@ class FileService(mitogen.service.Service): def _schedule_pending_unlocked(self, state): """ Consider the pending transfers for a stream, pumping new chunks while - the unacknowledged byte count is below :attr:`max_queue_size`. Must be - called with the StreamState lock held. + the unacknowledged byte count is below :attr:`window_size_bytes`. Must + be called with the StreamState lock held. :param StreamState state: Stream to schedule chunks for. """ - while state.jobs and state.unacked < self.max_queue_size: + while state.jobs and state.unacked < self.window_size_bytes: sender, fp = state.jobs[0] s = fp.read(mitogen.core.CHUNK_SIZE) state.unacked += len(s) @@ -576,9 +565,9 @@ class FileService(mitogen.service.Service): @mitogen.service.no_reply() def acknowledge(self, size, msg): """ - Acknowledgement bytes received by a transfer target, scheduling new - chunks to keep the window full. This should be called for every chunk - received by the target. + Acknowledge bytes received by a transfer target, scheduling new chunks + to keep the window full. This should be called for every chunk received + by the target. """ stream = self.router.stream_by_id(msg.src_id) state = self._state_by_stream[stream]