issue #212: support explicit acknowledgements in FileService.

pull/237/head
David Wilson 7 years ago
parent 5e6e56f0c5
commit 69e5902e61

@ -354,6 +354,17 @@ class ContextService(mitogen.service.Service):
return result return result
class StreamState(object):
def __init__(self):
#: List of [(Sender, file object)]
self.jobs = []
self.completing = {}
#: In-flight byte count.
self.unacked = 0
#: Lock.
self.lock = threading.Lock()
class FileService(mitogen.service.Service): class FileService(mitogen.service.Service):
""" """
Streaming file server, used to serve both small files like Ansible module Streaming file server, used to serve both small files like Ansible module
@ -420,131 +431,20 @@ class FileService(mitogen.service.Service):
handle = 501 handle = 501
max_message_size = 1000 max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.' unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
#: Maximum size of any stream's output queue before we temporarily stop #: Maximum size of any stream's output queue before we stop pumping more
#: pumping more file chunks on that stream. The queue may overspill by up #: file chunks. The queue may overspill by up to mitogen.core.CHUNK_SIZE-1
#: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). #: bytes (128KiB-1). With max_queue_size=1MiB and a RTT of 10ms, maximum
#: throughput is 112MiB/sec, which is >5x what SSH can handle on my laptop.
max_queue_size = 1048576 max_queue_size = 1048576
#: Time spent by the scheduler thread asleep when it has no more data to
#: pump, but while at least one transfer remains active. With
#: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any
#: single stream is 112MiB/sec, which is >5x what SSH can handle on my
#: laptop.
sleep_delay_secs = 0.01
def __init__(self, router): def __init__(self, router):
super(FileService, self).__init__(router) super(FileService, self).__init__(router)
#: Mapping of registered path -> file size. #: Mapping of registered path -> file size.
self._metadata_by_path = {} self._metadata_by_path = {}
#: Queue used to communicate from service to scheduler thread. #: Mapping of Stream->StreamState.
self._queue = mitogen.core.Latch() self._state_by_stream = {}
#: Mapping of Stream->[(Sender, file object)].
self._pending_by_stream = {}
self._thread = threading.Thread(target=self._scheduler_main)
self._thread.start()
def on_shutdown(self):
"""
Respond to shutdown of the service pool by marking our queue closed.
This causes :meth:`_sleep_on_queue` to wake immediately and return
:data:`False`, causing the scheduler main thread to exit.
"""
self._queue.close()
def _pending_bytes(self, stream):
"""
Defer a function call to the Broker thread in order to accurately
measure the bytes pending in `stream`'s queue.
This must be done synchronized with the Broker, as OS scheduler
uncertainty could cause Sender.send()'s deferred enqueues to be
processed very late, making the output queue look much emptier than it
really is (or is about to become).
"""
latch = mitogen.core.Latch()
self.router.broker.defer(lambda: latch.put(stream.pending_bytes()))
return latch.get()
def _schedule_pending(self, stream, pending):
"""
Consider the pending file transfers for a single stream, pumping new
file chunks into the stream's queue while its size is below the
configured limit.
:param mitogen.core.Stream stream:
Stream to pump chunks for.
:param pending:
Corresponding list from :attr:`_pending_by_stream`.
"""
while pending and self._pending_bytes(stream) < self.max_queue_size:
sender, fp = pending[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
if s:
sender.send(s)
continue
# Empty read, indicating this file is fully transferred. Mark the
# sender closed (causing the corresponding Receiver loop in the
# target to exit), close the file handle, remove our entry from the
# pending list, and delete the stream's entry in the pending map if
# no more sends remain.
sender.close()
fp.close()
pending.pop(0)
if not pending:
del self._pending_by_stream[stream]
def _sleep_on_queue(self):
"""
Sleep indefinitely (no active transfers) or for
:attr:`sleep_delay_secs` (active transfers) waiting for a new transfer
request to arrive from the :meth:`fetch` method.
If a new request arrives, add it to the appropriate list in
:attr:`_pending_by_stream`.
:returns:
:data:`True` the scheduler's queue is still open,
:meth:`on_shutdown` hasn't been called yet, otherwise
:data:`False`.
"""
if self._pending_by_stream:
timeout = self.sleep_delay_secs
else:
timeout = None
try:
sender, fp = self._queue.get(timeout=timeout)
except mitogen.core.LatchError:
return False
except mitogen.core.TimeoutError:
return True
LOG.debug('%r._sleep_on_queue(): setting up %r for %r',
self, fp.name, sender)
stream = self.router.stream_by_id(sender.context.context_id)
pending = self._pending_by_stream.setdefault(stream, [])
pending.append((sender, fp))
return True
def _scheduler_main(self):
"""
Scheduler thread's main function. Sleep until
:meth:`_sleep_on_queue` indicates the queue has been shut down,
pumping pending file chunks each time we wake.
"""
while self._sleep_on_queue():
for stream, pending in list(self._pending_by_stream.items()):
self._schedule_pending(stream, pending)
# on_shutdown() has been called. Send close() on every sender to give
# targets a chance to shut down gracefully.
LOG.debug('%r._scheduler_main() shutting down', self)
for _, pending in self._pending_by_stream.items():
for sender, fp in pending:
sender.close()
fp.close()
def _name_or_none(self, func, n, attr): def _name_or_none(self, func, n, attr):
try: try:
@ -558,8 +458,8 @@ class FileService(mitogen.service.Service):
}) })
def register(self, path): def register(self, path):
""" """
Authorize a path for access by child contexts. Calling this repeatedly Authorize a path for access by children. Repeat calls with the same
with the same path is harmless. path is harmless.
:param str path: :param str path:
File path. File path.
@ -581,12 +481,51 @@ class FileService(mitogen.service.Service):
'atime': st.st_atime, 'atime': st.st_atime,
} }
def on_shutdown(self):
"""
Respond to shutdown by sending close() to every target, allowing their
receive loop to exit and clean up gracefully.
"""
LOG.debug('%r.on_shutdown()', self)
for stream, state in self._state_by_stream.items():
state.lock.acquire()
try:
for sender, fp in reversed(state.jobs):
sender.close()
fp.close()
state.jobs.pop()
finally:
state.lock.release()
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`max_queue_size`. Must be
called with the StreamState lock held.
:param StreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.max_queue_size:
sender, fp = state.jobs[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
state.unacked += len(s)
sender.send(s)
if not s:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
sender.close()
fp.close()
state.jobs.pop(0)
@mitogen.service.expose(policy=mitogen.service.AllowAny()) @mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
'path': basestring, 'path': basestring,
'sender': mitogen.core.Sender, 'sender': mitogen.core.Sender,
}) })
def fetch(self, path, sender): def fetch(self, path, sender, msg):
""" """
Fetch a file's data. Fetch a file's data.
@ -603,13 +542,52 @@ class FileService(mitogen.service.Service):
* ``group``: Owner group name on host machine. * ``group``: Owner group name on host machine.
* ``mtime``: Floating point modification time. * ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time. * ``ctime``: Floating point change time.
:raises mitogen.core.CallError: :raises Error:
The path was not registered. Unregistered path, or attempt to send to context that was not the
requestee context.
""" """
if path not in self._metadata_by_path: if path not in self._metadata_by_path:
raise mitogen.core.CallError(self.unregistered_msg) raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path) LOG.debug('Serving %r', path)
fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) fp = open(path, 'rb', mitogen.core.CHUNK_SIZE)
self._queue.put((sender, fp)) # Response must arrive first so requestee can begin receive loop,
return self._metadata_by_path[path] # otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
msg.reply(self._metadata_by_path[path])
stream = self.router.stream_by_id(sender.context.context_id)
state = self._state_by_stream.setdefault(stream, StreamState())
state.lock.acquire()
try:
state.jobs.append((sender, fp))
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({
'size': int,
})
@mitogen.service.no_reply()
def acknowledge(self, size, msg):
"""
Acknowledgement bytes received by a transfer target, scheduling new
chunks to keep the window full. This should be called for every chunk
received by the target.
"""
stream = self.router.stream_by_id(msg.src_id)
state = self._state_by_stream[stream]
state.lock.acquire()
try:
if state.unacked < size:
LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d',
self, msg.src_id, state.unacked, size)
state.unacked -= min(state.unacked, size)
self._schedule_pending_unlocked(state)
finally:
state.lock.release()

@ -98,6 +98,14 @@ def _get_file(context, path, out_fp):
for chunk in recv: for chunk in recv:
s = chunk.unpickle() s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s)) LOG.debug('_get_file(%r): received %d bytes', path, len(s))
mitogen.service.call_async(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='acknowledge',
kwargs={
'size': len(s),
}
)
out_fp.write(s) out_fp.write(s)
ok = out_fp.tell() == metadata['size'] ok = out_fp.tell() == metadata['size']

Loading…
Cancel
Save