diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 35643d7d..ee34c22b 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -613,7 +613,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): utimes=(st.st_atime, st.st_mtime)) self.parent.call_service( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='register', path=mitogen.utils.cast(in_path) ) diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index 29a1670a..825e5897 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -182,7 +182,7 @@ class BinaryPlanner(Planner): def _grant_file_service_access(self, invocation): invocation.connection.parent.call_service( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='register', path=invocation.module_path, ) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 4946aa29..236e8ab7 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -151,7 +151,7 @@ class MuxProcess(object): Construct a ContextService and a thread to service requests for it arriving from worker processes. """ - file_service = ansible_mitogen.services.FileService(router=self.router) + file_service = mitogen.service.FileService(router=self.router) self.pool = mitogen.service.Pool( router=self.router, services=[ diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index c9e6d53b..fe14365f 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -272,7 +272,7 @@ class ProgramRunner(Runner): :param str path: Absolute path to the program file on the master, as it can be retrieved - via :class:`ansible_mitogen.services.FileService`. + via :class:`mitogen.service.FileService`. :param bool emulate_tty: If :data:`True`, execute the program with `stdout` and `stderr` merged into a single pipe, emulating Ansible behaviour when an SSH TTY is in diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 0add61c5..c99d2840 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -38,15 +38,11 @@ when a child has completed a job. """ from __future__ import absolute_import -import grp import logging import os import os.path -import pwd -import stat import sys import threading -import zlib import mitogen import mitogen.service @@ -367,252 +363,6 @@ class ContextService(mitogen.service.Service): return result -class StreamState(object): - def __init__(self): - #: List of [(Sender, file object)] - self.jobs = [] - self.completing = {} - #: In-flight byte count. - self.unacked = 0 - #: Lock. - self.lock = threading.Lock() - - -class FileService(mitogen.service.Service): - """ - Streaming file server, used to serve small files like Ansible modules and - huge files like ISO images. Paths must be registered by a trusted context - before they will be served to a child. - - Transfers are divided among the physical streams that connect external - contexts, ensuring each stream never has excessive data buffered in RAM, - while still maintaining enough to fully utilize available bandwidth. This - is achieved by making an initial bandwidth assumption, enqueueing enough - chunks to fill that assumed pipe, then responding to delivery - acknowledgements from the receiver by scheduling new chunks. - - Transfers proceed one-at-a-time per stream. When multiple contexts exist on - a stream (e.g. one is the SSH account, another is a sudo account, and a - third is a proxied SSH connection), each request is satisfied in turn - before subsequent requests start flowing. This ensures when a stream is - contended, priority is given to completing individual transfers rather than - potentially aborting many partial transfers, causing the bandwidth to be - wasted. - - Theory of operation: - 1. Trusted context (i.e. WorkerProcess) calls register(), making a - file available to any untrusted context. - 2. Requestee context creates a mitogen.core.Receiver() to receive - chunks, then calls fetch(path, recv.to_sender()), to set up the - transfer. - 3. fetch() replies to the call with the file's metadata, then - schedules an initial burst up to the window size limit (1MiB). - 4. Chunks begin to arrive in the requestee, which calls acknowledge() - for each 128KiB received. - 5. The acknowledge() call arrives at FileService, which scheduled a new - chunk to refill the drained window back to the size limit. - 6. When the last chunk has been pumped for a single transfer, - Sender.close() is called causing the receive loop in - target.py::_get_file() to exit, allowing that code to compare the - transferred size with the total file size from the metadata. - 7. If the sizes mismatch, _get_file()'s caller is informed which will - discard the result and log/raise an error. - - Shutdown: - 1. process.py calls service.Pool.shutdown(), which arranges for the - service pool threads to exit and be joined, guranteeing no new - requests can arrive, before calling Service.on_shutdown() for each - registered service. - 2. FileService.on_shutdown() walks every in-progress transfer and calls - Sender.close(), causing Receiver loops in the requestees to exit - early. The size check fails and any partially downloaded file is - discarded. - 3. Control exits _get_file() in every target, and graceful shutdown can - proceed normally, without the associated thread needing to be - forcefully killed. - """ - unregistered_msg = 'Path is not registered with FileService.' - context_mismatch_msg = 'sender= kwarg context must match requestee context' - - #: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which - #: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5. - window_size_bytes = 1048576 - - def __init__(self, router): - super(FileService, self).__init__(router) - #: Mapping of registered path -> file size. - self._metadata_by_path = {} - #: Mapping of Stream->StreamState. - self._state_by_stream = {} - - def _name_or_none(self, func, n, attr): - try: - return getattr(func(n), attr) - except KeyError: - return None - - @mitogen.service.expose(policy=mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'paths': list - }) - def register_many(self, paths): - """ - Batch version of register(). - """ - for path in paths: - self.register(path) - - @mitogen.service.expose(policy=mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'path': basestring - }) - def register(self, path): - """ - Authorize a path for access by children. Repeat calls with the same - path is harmless. - - :param str path: - File path. - """ - if path in self._metadata_by_path: - return - - st = os.stat(path) - if not stat.S_ISREG(st.st_mode): - raise IOError('%r is not a regular file.' % (in_path,)) - - LOG.debug('%r: registering %r', self, path) - self._metadata_by_path[path] = { - 'size': st.st_size, - 'mode': st.st_mode, - 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), - 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), - 'mtime': st.st_mtime, - 'atime': st.st_atime, - } - - def on_shutdown(self): - """ - Respond to shutdown by sending close() to every target, allowing their - receive loop to exit and clean up gracefully. - """ - LOG.debug('%r.on_shutdown()', self) - for stream, state in self._state_by_stream.items(): - state.lock.acquire() - try: - for sender, fp in reversed(state.jobs): - sender.close() - fp.close() - state.jobs.pop() - finally: - state.lock.release() - - # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, - # odd-sized messages waste one tiny write() per message on the trailer. - # Therefore subtract 10 bytes pickle overhead + 24 bytes header. - IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + ( - len( - mitogen.core.Message.pickled( - mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE) - ).data - ) - mitogen.core.CHUNK_SIZE - )) - - def _schedule_pending_unlocked(self, state): - """ - Consider the pending transfers for a stream, pumping new chunks while - the unacknowledged byte count is below :attr:`window_size_bytes`. Must - be called with the StreamState lock held. - - :param StreamState state: - Stream to schedule chunks for. - """ - while state.jobs and state.unacked < self.window_size_bytes: - sender, fp = state.jobs[0] - s = fp.read(self.IO_SIZE) - if s: - state.unacked += len(s) - sender.send(mitogen.core.Blob(s)) - else: - # File is done. Cause the target's receive loop to exit by - # closing the sender, close the file, and remove the job entry. - sender.close() - fp.close() - state.jobs.pop(0) - - @mitogen.service.expose(policy=mitogen.service.AllowAny()) - @mitogen.service.no_reply() - @mitogen.service.arg_spec({ - 'path': basestring, - 'sender': mitogen.core.Sender, - }) - def fetch(self, path, sender, msg): - """ - Start a transfer for a registered path. - - :param str path: - File path. - :param mitogen.core.Sender sender: - Sender to receive file data. - :returns: - Dict containing the file metadata: - - * ``size``: File size in bytes. - * ``mode``: Integer file mode. - * ``owner``: Owner account name on host machine. - * ``group``: Owner group name on host machine. - * ``mtime``: Floating point modification time. - * ``ctime``: Floating point change time. - :raises Error: - Unregistered path, or Sender did not match requestee context. - """ - if path not in self._metadata_by_path: - raise Error(self.unregistered_msg) - if msg.src_id != sender.context.context_id: - raise Error(self.context_mismatch_msg) - - LOG.debug('Serving %r', path) - fp = open(path, 'rb', self.IO_SIZE) - # Response must arrive first so requestee can begin receive loop, - # otherwise first ack won't arrive until all pending chunks were - # delivered. In that case max BDP would always be 128KiB, aka. max - # ~10Mbit/sec over a 100ms link. - msg.reply(self._metadata_by_path[path]) - - stream = self.router.stream_by_id(sender.context.context_id) - state = self._state_by_stream.setdefault(stream, StreamState()) - state.lock.acquire() - try: - state.jobs.append((sender, fp)) - self._schedule_pending_unlocked(state) - finally: - state.lock.release() - - @mitogen.service.expose(policy=mitogen.service.AllowAny()) - @mitogen.service.no_reply() - @mitogen.service.arg_spec({ - 'size': int, - }) - @mitogen.service.no_reply() - def acknowledge(self, size, msg): - """ - Acknowledge bytes received by a transfer target, scheduling new chunks - to keep the window full. This should be called for every chunk received - by the target. - """ - stream = self.router.stream_by_id(msg.src_id) - state = self._state_by_stream[stream] - state.lock.acquire() - try: - if state.unacked < size: - LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', - self, msg.src_id, state.unacked, size) - state.unacked -= min(state.unacked, size) - self._schedule_pending_unlocked(state) - finally: - state.lock.release() - - class ModuleDepService(mitogen.service.Service): """ Scan a new-style module and produce a cached mapping of module_utils names diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 089e5167..7aaf50fb 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -89,7 +89,7 @@ def _get_file(context, path, out_fp): t0 = time.time() recv = mitogen.core.Receiver(router=context.router) metadata = context.call_service( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='fetch', path=path, sender=recv.to_sender(), @@ -99,7 +99,7 @@ def _get_file(context, path, out_fp): s = chunk.unpickle() LOG.debug('_get_file(%r): received %d bytes', path, len(s)) context.call_service_async( - service_name='ansible_mitogen.services.FileService', + service_name='mitogen.service.FileService', method_name='acknowledge', size=len(s), ).close() diff --git a/mitogen/service.py b/mitogen/service.py index 4d824f3d..842eac1e 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -26,7 +26,12 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import grp +import os +import os.path import pprint +import pwd +import stat import sys import threading @@ -489,3 +494,238 @@ class Pool(object): len(self._threads), th.name, ) + + +class FileStreamState(object): + def __init__(self): + #: List of [(Sender, file object)] + self.jobs = [] + self.completing = {} + #: In-flight byte count. + self.unacked = 0 + #: Lock. + self.lock = threading.Lock() + + +class FileService(Service): + """ + Streaming file server, used to serve small files and huge files alike. + Paths must be registered by a trusted context before they will be served to + a child. + + Transfers are divided among the physical streams that connect external + contexts, ensuring each stream never has excessive data buffered in RAM, + while still maintaining enough to fully utilize available bandwidth. This + is achieved by making an initial bandwidth assumption, enqueueing enough + chunks to fill that assumed pipe, then responding to delivery + acknowledgements from the receiver by scheduling new chunks. + + Transfers proceed one-at-a-time per stream. When multiple contexts exist on + a stream (e.g. one is the SSH account, another is a sudo account, and a + third is a proxied SSH connection), each request is satisfied in turn + before subsequent requests start flowing. This ensures when a stream is + contended, priority is given to completing individual transfers rather than + potentially aborting many partial transfers, causing the bandwidth to be + wasted. + + Theory of operation: + 1. Trusted context (i.e. WorkerProcess) calls register(), making a + file available to any untrusted context. + 2. Requestee context creates a mitogen.core.Receiver() to receive + chunks, then calls fetch(path, recv.to_sender()), to set up the + transfer. + 3. fetch() replies to the call with the file's metadata, then + schedules an initial burst up to the window size limit (1MiB). + 4. Chunks begin to arrive in the requestee, which calls acknowledge() + for each 128KiB received. + 5. The acknowledge() call arrives at FileService, which scheduled a new + chunk to refill the drained window back to the size limit. + 6. When the last chunk has been pumped for a single transfer, + Sender.close() is called causing the receive loop in + target.py::_get_file() to exit, allowing that code to compare the + transferred size with the total file size from the metadata. + 7. If the sizes mismatch, _get_file()'s caller is informed which will + discard the result and log/raise an error. + + Shutdown: + 1. process.py calls service.Pool.shutdown(), which arranges for the + service pool threads to exit and be joined, guranteeing no new + requests can arrive, before calling Service.on_shutdown() for each + registered service. + 2. FileService.on_shutdown() walks every in-progress transfer and calls + Sender.close(), causing Receiver loops in the requestees to exit + early. The size check fails and any partially downloaded file is + discarded. + 3. Control exits _get_file() in every target, and graceful shutdown can + proceed normally, without the associated thread needing to be + forcefully killed. + """ + unregistered_msg = 'Path is not registered with FileService.' + context_mismatch_msg = 'sender= kwarg context must match requestee context' + + #: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which + #: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5. + window_size_bytes = 1048576 + + def __init__(self, router): + super(FileService, self).__init__(router) + #: Mapping of registered path -> file size. + self._metadata_by_path = {} + #: Mapping of Stream->FileStreamState. + self._state_by_stream = {} + + def _name_or_none(self, func, n, attr): + try: + return getattr(func(n), attr) + except KeyError: + return None + + @expose(policy=AllowParents()) + @arg_spec({ + 'path': basestring, + }) + def register(self, path): + """ + Authorize a path for access by children. Repeat calls with the same + path is harmless. + + :param str path: + File path. + """ + if path in self._metadata_by_path: + return + + st = os.stat(path) + if not stat.S_ISREG(st.st_mode): + raise IOError('%r is not a regular file.' % (in_path,)) + + LOG.debug('%r: registering %r', self, path) + self._metadata_by_path[path] = { + 'size': st.st_size, + 'mode': st.st_mode, + 'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'), + 'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'), + 'mtime': st.st_mtime, + 'atime': st.st_atime, + } + + def on_shutdown(self): + """ + Respond to shutdown by sending close() to every target, allowing their + receive loop to exit and clean up gracefully. + """ + LOG.debug('%r.on_shutdown()', self) + for stream, state in self._state_by_stream.items(): + state.lock.acquire() + try: + for sender, fp in reversed(state.jobs): + sender.close() + fp.close() + state.jobs.pop() + finally: + state.lock.release() + + # The IO loop pumps 128KiB chunks. An ideal message is a multiple of this, + # odd-sized messages waste one tiny write() per message on the trailer. + # Therefore subtract 10 bytes pickle overhead + 24 bytes header. + IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + ( + len( + mitogen.core.Message.pickled( + mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE) + ).data + ) - mitogen.core.CHUNK_SIZE + )) + + def _schedule_pending_unlocked(self, state): + """ + Consider the pending transfers for a stream, pumping new chunks while + the unacknowledged byte count is below :attr:`window_size_bytes`. Must + be called with the FileStreamState lock held. + + :param FileStreamState state: + Stream to schedule chunks for. + """ + while state.jobs and state.unacked < self.window_size_bytes: + sender, fp = state.jobs[0] + s = fp.read(self.IO_SIZE) + if s: + state.unacked += len(s) + sender.send(mitogen.core.Blob(s)) + else: + # File is done. Cause the target's receive loop to exit by + # closing the sender, close the file, and remove the job entry. + sender.close() + fp.close() + state.jobs.pop(0) + + @expose(policy=AllowAny()) + @no_reply() + @arg_spec({ + 'path': basestring, + 'sender': mitogen.core.Sender, + }) + def fetch(self, path, sender, msg): + """ + Start a transfer for a registered path. + + :param str path: + File path. + :param mitogen.core.Sender sender: + Sender to receive file data. + :returns: + Dict containing the file metadata: + + * ``size``: File size in bytes. + * ``mode``: Integer file mode. + * ``owner``: Owner account name on host machine. + * ``group``: Owner group name on host machine. + * ``mtime``: Floating point modification time. + * ``ctime``: Floating point change time. + :raises Error: + Unregistered path, or Sender did not match requestee context. + """ + if path not in self._metadata_by_path: + raise Error(self.unregistered_msg) + if msg.src_id != sender.context.context_id: + raise Error(self.context_mismatch_msg) + + LOG.debug('Serving %r', path) + fp = open(path, 'rb', self.IO_SIZE) + # Response must arrive first so requestee can begin receive loop, + # otherwise first ack won't arrive until all pending chunks were + # delivered. In that case max BDP would always be 128KiB, aka. max + # ~10Mbit/sec over a 100ms link. + msg.reply(self._metadata_by_path[path]) + + stream = self.router.stream_by_id(sender.context.context_id) + state = self._state_by_stream.setdefault(stream, FileStreamState()) + state.lock.acquire() + try: + state.jobs.append((sender, fp)) + self._schedule_pending_unlocked(state) + finally: + state.lock.release() + + @expose(policy=AllowAny()) + @no_reply() + @arg_spec({ + 'size': int, + }) + @no_reply() + def acknowledge(self, size, msg): + """ + Acknowledge bytes received by a transfer target, scheduling new chunks + to keep the window full. This should be called for every chunk received + by the target. + """ + stream = self.router.stream_by_id(msg.src_id) + state = self._state_by_stream[stream] + state.lock.acquire() + try: + if state.unacked < size: + LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d', + self, msg.src_id, state.unacked, size) + state.unacked -= min(state.unacked, size) + self._schedule_pending_unlocked(state) + finally: + state.lock.release()