Merge pull request #209 from dw/dmw

Streaming file transfer :D
pull/225/head
dw 7 years ago committed by GitHub
commit 9ec20086c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -41,7 +41,7 @@ import mitogen.utils
import ansible_mitogen.target import ansible_mitogen.target
import ansible_mitogen.process import ansible_mitogen.process
from ansible_mitogen.services import ContextService import ansible_mitogen.services
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -160,7 +160,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def _wrap_connect(self, on_error, kwargs): def _wrap_connect(self, on_error, kwargs):
dct = mitogen.service.call( dct = mitogen.service.call(
context=self.parent, context=self.parent,
handle=ContextService.handle, handle=ansible_mitogen.services.ContextService.handle,
method='get', method='get',
kwargs=mitogen.utils.cast(kwargs), kwargs=mitogen.utils.cast(kwargs),
) )
@ -300,7 +300,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
if context: if context:
mitogen.service.call( mitogen.service.call(
context=self.parent, context=self.parent,
handle=ContextService.handle, handle=ansible_mitogen.services.ContextService.handle,
method='put', method='put',
kwargs={ kwargs={
'context': context 'context': context
@ -398,12 +398,25 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def put_file(self, in_path, out_path): def put_file(self, in_path, out_path):
""" """
Implement put_file() by caling the corresponding Implement put_file() by streamily transferring the file via
ansible_mitogen.target function in the target. FileService.
:param str in_path: :param str in_path:
Local filesystem path to read. Local filesystem path to read.
:param str out_path: :param str out_path:
Remote filesystem path to write. Remote filesystem path to write.
""" """
self.put_data(out_path, ansible_mitogen.target.read_path(in_path)) mitogen.service.call(
context=self.parent,
handle=ansible_mitogen.services.FileService.handle,
method='register',
kwargs={
'path': mitogen.utils.cast(in_path)
}
)
self.call(
ansible_mitogen.target.transfer_file,
context=self.parent,
in_path=in_path,
out_path=out_path
)

@ -359,9 +359,120 @@ class FileService(mitogen.service.Service):
max_message_size = 1000 max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.' unregistered_msg = 'Path is not registered with FileService.'
#: Maximum size of any stream's output queue before we temporarily stop
#: pumping more file chunks. The queue may overspill by up to
#: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
max_queue_size = 1048576
#: Time spent by the scheduler thread asleep when it has no more queues to
#: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput
#: on any single stream is 100MiB/sec, which is 5x what SSH can handle on
#: my laptop.
sleep_delay_ms = 0.01
def __init__(self, router): def __init__(self, router):
super(FileService, self).__init__(router) super(FileService, self).__init__(router)
self._paths = {} #: Mapping of registered path -> file size.
self._size_by_path = {}
#: Queue used to communicate from service to scheduler thread.
self._queue = mitogen.core.Latch()
#: Mapping of Stream->[(sender, fp)].
self._pending_by_stream = {}
self._thread = threading.Thread(target=self._scheduler_main)
self._thread.start()
def _pending_bytes(self, stream):
"""
Defer a function call to the Broker thread in order to accurately
measure the bytes pending in `stream`'s queue.
This must be done synchronized with the Broker, as scheduler
uncertainty could cause Sender.send()'s deferred enqueues to be
processed very late, making the output queue look much emptier than it
really is (or is about to become).
"""
latch = mitogen.core.Latch()
self.router.broker.defer(lambda: latch.put(stream.pending_bytes()))
return latch.get()
def _schedule_pending(self, stream, pending):
"""
Consider the pending file transfers for a single stream, pumping new
file chunks into the stream's queue while its size is below the
configured limit.
:param mitogen.core.Stream stream:
Stream to pump chunks for.
:param pending:
Corresponding list from :attr:`_pending_by_stream`.
"""
while pending and self._pending_bytes(stream) < self.max_queue_size:
sender, fp = pending[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
if s:
sender.send(s)
continue
# Empty read, indicating this file is fully transferred. Mark the
# sender closed (causing the corresponding Receiver loop in the
# target to exit), close the file handle, remove our entry from the
# pending list, and delete the stream's entry in the pending map if
# no more sends remain.
sender.close()
fp.close()
pending.pop(0)
if not pending:
del self._pending_by_stream[stream]
def _sleep_on_queue(self):
"""
Sleep indefinitely (no active transfers) or for :attr:`sleep_delay_ms`
(active transfers) waiting for a new transfer request to arrive from
the :meth:`fetch` method.
If a new request arrives, add it to the appropriate list in
:attr:`_pending_by_stream`.
:returns:
:data:`True` the scheduler's queue is still open,
:meth:`on_shutdown` hasn't been called yet, otherwise
:data:`False`.
"""
try:
if self._schedule_pending:
timeout = self.sleep_delay_ms
else:
timeout = None
sender, fp = self._queue.get(timeout=timeout)
except mitogen.core.LatchError:
return False
except mitogen.core.TimeoutError:
return True
LOG.debug('%r._sleep_on_queue(): setting up %r for %r',
self, fp.name, sender)
stream = self.router.stream_by_id(sender.context.context_id)
pending = self._pending_by_stream.setdefault(stream, [])
pending.append((sender, fp))
return True
def _scheduler_main(self):
"""
Scheduler thread's main function. Sleep until
:meth:`_sleep_on_queue` indicates the queue has been shut down,
pending pending file chunks each time we wake.
"""
while self._sleep_on_queue():
for stream, pending in list(self._pending_by_stream.items()):
self._schedule_pending(stream, pending)
# on_shutdown() has been called. Send close() on every sender to give
# targets a chance to shut down gracefully.
LOG.debug('%r._scheduler_main() shutting down', self)
for _, pending in self._pending_by_stream.items():
for sender, fp in pending:
sender.close()
fp.close()
@mitogen.service.expose(policy=mitogen.service.AllowParents()) @mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
@ -375,30 +486,35 @@ class FileService(mitogen.service.Service):
:param str path: :param str path:
File path. File path.
""" """
if path not in self._paths: if path not in self._size_by_path:
LOG.debug('%r: registering %r', self, path) LOG.debug('%r: registering %r', self, path)
with open(path, 'rb') as fp: self._size_by_path[path] = os.path.getsize(path)
self._paths[path] = zlib.compress(fp.read())
@mitogen.service.expose(policy=mitogen.service.AllowAny()) @mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
'path': basestring 'path': basestring,
'sender': mitogen.core.Sender,
}) })
def fetch(self, path): def fetch(self, path, sender):
""" """
Fetch a file's data. Fetch a file's data.
:param str path: :param str path:
File path. File path.
:param mitogen.core.Sender sender:
Sender to receive file data.
:returns: :returns:
The file data. File size. The target can decide whether to keep the file in RAM or
disk based on the return value.
:raises mitogen.core.CallError: :raises mitogen.core.CallError:
The path was not registered. The path was not registered.
""" """
if path not in self._paths: if path not in self._size_by_path:
raise mitogen.core.CallError(self.unregistered_msg) raise mitogen.core.CallError(self.unregistered_msg)
LOG.debug('Serving %r', path) LOG.debug('Serving %r', path)
return self._paths[path] self._queue.put((
sender,
open(path, 'rb', mitogen.core.CHUNK_SIZE),
))
return self._size_by_path[path]

@ -32,6 +32,7 @@ for file transfer, module execution and sundry bits like changing file modes.
""" """
from __future__ import absolute_import from __future__ import absolute_import
import cStringIO
import json import json
import logging import logging
import operator import operator
@ -64,6 +65,48 @@ _file_cache = {}
_fork_parent = None _fork_parent = None
def _get_file(context, path, out_fp):
"""
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
:returns:
:data:`True` on success, or :data:`False` if the transfer was
interrupted and the output should be discarded.
"""
LOG.debug('_get_file(): fetching %r from %r', path, context)
recv = mitogen.core.Receiver(router=context.router)
size = mitogen.service.call(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='fetch',
kwargs={
'path': path,
'sender': recv.to_sender()
}
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
out_fp.write(s)
if out_fp.tell() != size:
LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down.', path)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r',
size, path, context)
return out_fp.tell() == size
def get_file(context, path): def get_file(context, path):
""" """
Basic in-memory caching module fetcher. This generates an one roundtrip for Basic in-memory caching module fetcher. This generates an one roundtrip for
@ -79,22 +122,40 @@ def get_file(context, path):
Bytestring file data. Bytestring file data.
""" """
if path not in _file_cache: if path not in _file_cache:
LOG.debug('target.get_file(): fetching %r from %r', path, context) io = cStringIO.StringIO()
_file_cache[path] = zlib.decompress( if not _get_file(context, path, io):
mitogen.service.call( raise IOError('transfer of %r was interrupted.' % (path,))
context=context, _file_cache[path] = io.getvalue()
handle=ansible_mitogen.services.FileService.handle,
method='fetch',
kwargs={
'path': path
}
)
)
LOG.debug('target.get_file(): fetched %r from %r', path, context)
return _file_cache[path] return _file_cache[path]
def transfer_file(context, in_path, out_path):
"""
Streamily download a file from the connection multiplexer process in the
controller.
:param mitogen.core.Context context:
Reference to the context hosting the FileService that will be used to
fetch the file.
:param bytes in_path:
FileService registered name of the input file.
:param bytes out_path:
Name of the output path on the local disk.
"""
fp = open(out_path+'.tmp', 'wb', mitogen.core.CHUNK_SIZE)
try:
try:
if not _get_file(context, in_path, fp):
raise IOError('transfer of %r was interrupted.' % (in_path,))
except Exception:
os.unlink(fp.name)
raise
finally:
fp.close()
os.rename(out_path + '.tmp', out_path)
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
def start_fork_parent(econtext): def start_fork_parent(econtext):
""" """

@ -5,15 +5,15 @@ Ansible Extension
.. image:: images/ansible/cell_division.png .. image:: images/ansible/cell_division.png
:align: right :align: right
An experimental extension to `Ansible`_ is included that implements host An extension to `Ansible`_ is included that implements host connections over
connections over Mitogen, replacing embedded shell invocations with pure-Python Mitogen, replacing embedded shell invocations with pure-Python equivalents
equivalents invoked via highly efficient remote procedure calls tunnelled over invoked via highly efficient remote procedure calls tunnelled over SSH. No
SSH. No changes are required to the target hosts. changes are required to the target hosts.
The extension isn't nearly in a generally dependable state yet, however it The extension is approaching a generally dependable state, and works well for
already works well enough for testing against real-world playbooks. `Bug many real-world playbooks. `Bug reports`_ in this area are very welcome
reports`_ in this area are very welcome Ansible is a huge beast, and only Ansible is a huge beast, and only significant testing will prove the
significant testing will prove the extension's soundness. extension's soundness.
Divergence from Ansible's normal behaviour is considered a bug, so please Divergence from Ansible's normal behaviour is considered a bug, so please
report anything you notice, regardless of how inconsequential it may seem. report anything you notice, regardless of how inconsequential it may seem.
@ -98,8 +98,7 @@ Installation
.. caution:: .. caution::
Thoroughly review the list of limitations before use, and **do not test the Please review the behavioural differences documented below prior to use.
prototype in a live environment until this notice is removed**.
1. Verify Ansible 2.4 and Python 2.7 are listed in the output of ``ansible 1. Verify Ansible 2.4 and Python 2.7 are listed in the output of ``ansible
--version`` --version``
@ -123,22 +122,6 @@ Installation
Limitations Limitations
----------- -----------
This is a proof of concept: issues below are exclusively due to code immaturity.
High Risk
~~~~~~~~~
* Transfer of large files using certain Ansible-internal APIs, such as
triggered via the ``copy`` module, will cause corresponding memory and CPU
spikes on both host and target machine, due to delivering the file as a
single message. If many machines are targetted, the controller could easily
exhaust available RAM. This will be fixed soon as it's likely to be tickled
by common playbooks.
Low Risk
~~~~~~~~
* Only Ansible 2.4 is being used for development, with occasional tests under * Only Ansible 2.4 is being used for development, with occasional tests under
2.5, 2.3 and 2.2. It should be more than possible to fully support at least 2.5, 2.3 and 2.2. It should be more than possible to fully support at least
2.3, if not also 2.2. 2.3, if not also 2.2.

@ -204,6 +204,25 @@ Stream Classes
.. autoclass:: Stream .. autoclass:: Stream
:members: :members:
.. method:: pending_bytes ()
Returns the number of bytes queued for transmission on this stream.
This can be used to limit the amount of data buffered in RAM by an
otherwise unlimited consumer.
For an accurate result, this method should be called from the Broker
thread, using a wrapper like:
::
def get_pending_bytes(self, stream):
latch = mitogen.core.Latch()
self.broker.defer(
lambda: latch.put(stream.pending_bytes())
)
return latch.get()
.. currentmodule:: mitogen.fork .. currentmodule:: mitogen.fork
.. autoclass:: Stream .. autoclass:: Stream

@ -795,8 +795,9 @@ class Stream(BasicStream):
self.sent_modules = set() self.sent_modules = set()
self.construct(**kwargs) self.construct(**kwargs)
self._input_buf = collections.deque() self._input_buf = collections.deque()
self._input_buf_len = 0
self._output_buf = collections.deque() self._output_buf = collections.deque()
self._input_buf_len = 0
self._output_buf_len = 0
def construct(self): def construct(self):
pass pass
@ -866,6 +867,9 @@ class Stream(BasicStream):
self._router._async_route(msg, self) self._router._async_route(msg, self)
return True return True
def pending_bytes(self):
return self._output_buf_len
def on_transmit(self, broker): def on_transmit(self, broker):
"""Transmit buffered messages.""" """Transmit buffered messages."""
_vv and IOLOG.debug('%r.on_transmit()', self) _vv and IOLOG.debug('%r.on_transmit()', self)
@ -881,6 +885,7 @@ class Stream(BasicStream):
self._output_buf.appendleft(buffer(buf, written)) self._output_buf.appendleft(buffer(buf, written))
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written) _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
self._output_buf_len -= written
if not self._output_buf: if not self._output_buf:
broker._stop_transmit(self) broker._stop_transmit(self)
@ -890,10 +895,10 @@ class Stream(BasicStream):
pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id, pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id,
msg.auth_id, msg.handle, msg.reply_to or 0, msg.auth_id, msg.handle, msg.reply_to or 0,
len(msg.data)) + msg.data len(msg.data)) + msg.data
was_transmitting = len(self._output_buf) if not self._output_buf_len:
self._output_buf.append(pkt)
if not was_transmitting:
self._router.broker._start_transmit(self) self._router.broker._start_transmit(self)
self._output_buf.append(pkt)
self._output_buf_len += len(pkt)
def send(self, msg): def send(self, msg):
"""Send `data` to `handle`, and tell the broker we have output. May """Send `data` to `handle`, and tell the broker we have output. May

@ -143,6 +143,11 @@ class Service(object):
self.__class__.__name__, self.__class__.__name__,
) )
def on_shutdown(self):
"""
Called by Pool.shutdown() once the last worker thread has exitted.
"""
def dispatch(self, args, msg): def dispatch(self, args, msg):
raise NotImplementedError() raise NotImplementedError()
@ -307,6 +312,8 @@ class Pool(object):
self._select.close() self._select.close()
for th in self._threads: for th in self._threads:
th.join() th.join()
for service in self.services:
service.on_shutdown()
def _worker_run(self): def _worker_run(self):
while True: while True:

Loading…
Cancel
Save