ansible: implement streaming in FileService.

This commit only uses it for the target.get_file() helper, which is only
used for transferring modules. The next commit wires it into the
Connection.transfer_file() API, which is the method the copy module
uses.
pull/209/head
David Wilson 6 years ago
parent 509ef14121
commit 29087018c7

@ -359,9 +359,120 @@ class FileService(mitogen.service.Service):
max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.'
#: Maximum size of any stream's output queue before we temporarily stop
#: pumping more file chunks. The queue may overspill by up to
#: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
max_queue_size = 1048576
#: Time spent by the scheduler thread asleep when it has no more queues to
#: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput
#: on any single stream is 100MiB/sec, which is 5x what SSH can handle on
#: my laptop.
sleep_delay = 0.01
def __init__(self, router):
super(FileService, self).__init__(router)
self._paths = {}
#: Mapping of registered path -> file size.
self._size_by_path = {}
#: Queue used to communicate from service to scheduler thread.
self._queue = mitogen.core.Latch()
#: Mapping of Stream->[(sender, fp)].
self._pending_by_stream = {}
self._thread = threading.Thread(target=self._scheduler_main)
self._thread.start()
def _pending_bytes(self, stream):
"""
Defer a function call to the Broker thread in order to accurately
measure the bytes pending in `stream`'s queue.
This must be done synchronized with the Broker, as scheduler
uncertainty could cause Sender.send()'s deferred enqueues to be
processed very late, making the output queue look much emptier than it
really is (or is about to become).
"""
latch = mitogen.core.Latch()
self.router.broker.defer(lambda: latch.put(stream.pending_bytes()))
return latch.get()
def _schedule_pending(self, stream, pending):
"""
Consider the pending file transfers for a single stream, pumping new
file chunks into the stream's queue while its size is below the
configured limit.
:param mitogen.core.Stream stream:
Stream to pump chunks for.
:param pending:
Corresponding list from :attr:`_pending_by_stream`.
"""
while pending and self._pending_bytes(stream) < self.max_queue_size:
sender, fp = pending[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
if s:
sender.send(s)
continue
# Empty read, indicating this file is fully transferred. Mark the
# sender closed (causing the corresponding Receiver loop in the
# target to exit), close the file handle, remove our entry from the
# pending list, and delete the stream's entry in the pending map if
# no more sends remain.
sender.close()
fp.close()
pending.pop(0)
if not pending:
del self._pending_by_stream[stream]
def _sleep_on_queue(self):
"""
Sleep indefinitely (no active transfers) or for :attr:`sleep_delay_ms`
(active transfers) waiting for a new transfer request to arrive from
the :meth:`fetch` method.
If a new request arrives, add it to the appropriate list in
:attr:`_pending_by_stream`.
:returns:
:data:`True` the scheduler's queue is still open,
:meth:`on_shutdown` hasn't been called yet, otherwise
:data:`False`.
"""
try:
if self._schedule_pending:
timeout = self.sleep_delay_ms
else:
timeout = None
sender, fp = self._queue.get(timeout=timeout)
except mitogen.core.LatchError:
return False
except mitogen.core.TimeoutError:
return True
LOG.debug('%r._sleep_on_queue(): setting up %r for %r',
self, fp.name, sender)
stream = self.router.stream_by_id(sender.context.context_id)
pending = self._pending_by_stream.setdefault(stream, [])
pending.append((sender, fp))
return True
def _scheduler_main(self):
"""
Scheduler thread's main function. Sleep until
:meth:`_sleep_on_queue` indicates the queue has been shut down,
pending pending file chunks each time we wake.
"""
while self._sleep_on_queue():
for stream, pending in list(self._pending_by_stream.items()):
self._schedule_pending(stream, pending)
# on_shutdown() has been called. Send close() on every sender to give
# targets a chance to shut down gracefully.
LOG.debug('%r._scheduler_main() shutting down', self)
for _, pending in self._pending_by_stream.items():
for sender, fp in pending:
sender.close()
fp.close()
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
@ -375,30 +486,35 @@ class FileService(mitogen.service.Service):
:param str path:
File path.
"""
if path not in self._paths:
if path not in self._size_by_path:
LOG.debug('%r: registering %r', self, path)
with open(path, 'rb') as fp:
self._paths[path] = zlib.compress(fp.read())
self._size_by_path[path] = os.path.getsize(path)
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.arg_spec({
'path': basestring
'path': basestring,
'sender': mitogen.core.Sender,
})
def fetch(self, path):
def fetch(self, path, sender):
"""
Fetch a file's data.
:param str path:
File path.
:param mitogen.core.Sender sender:
Sender to receive file data.
:returns:
The file data.
File size. The target can decide whether to keep the file in RAM or
disk based on the return value.
:raises mitogen.core.CallError:
The path was not registered.
"""
if path not in self._paths:
if path not in self._size_by_path:
raise mitogen.core.CallError(self.unregistered_msg)
LOG.debug('Serving %r', path)
return self._paths[path]
self._queue.put((
sender,
open(path, 'rb', mitogen.core.CHUNK_SIZE),
))
return self._size_by_path[path]

@ -32,6 +32,7 @@ for file transfer, module execution and sundry bits like changing file modes.
"""
from __future__ import absolute_import
import cStringIO
import json
import logging
import operator
@ -64,6 +65,34 @@ _file_cache = {}
_fork_parent = None
def _get_file(context, path, out_fp):
LOG.debug('get_file(): fetching %r from %r', path, context)
recv = mitogen.core.Receiver(router=context.router)
size = mitogen.service.call(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='fetch',
kwargs={
'path': path,
'sender': recv.to_sender()
}
)
for chunk in recv:
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
out_fp.write(s)
if out_fp.tell() != size:
LOG.error('get_file(%r): receiver was closed early, controller '
'is likely shutting down. Truncating output file.', path)
out_fp.truncate(0)
LOG.debug('target.get_file(): fetched %d bytes of %r from %r',
size, path, context)
return size
def get_file(context, path):
"""
Basic in-memory caching module fetcher. This generates an one roundtrip for
@ -79,19 +108,9 @@ def get_file(context, path):
Bytestring file data.
"""
if path not in _file_cache:
LOG.debug('target.get_file(): fetching %r from %r', path, context)
_file_cache[path] = zlib.decompress(
mitogen.service.call(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='fetch',
kwargs={
'path': path
}
)
)
LOG.debug('target.get_file(): fetched %r from %r', path, context)
io = cStringIO.StringIO()
_get_file(context, path, io)
_file_cache[path] = io.getvalue()
return _file_cache[path]

Loading…
Cancel
Save