Merge pull request #237 from dw/dmw

connection delegation fixes & issue #212
pull/246/head
dw 6 years ago committed by GitHub
commit 76102927c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -134,6 +134,7 @@ def _connect_setns(spec):
def _connect_sudo(spec):
return {
'method': 'sudo',
'enable_lru': True,
'kwargs': {
'username': spec['become_user'],
'password': spec['become_pass'],

@ -167,7 +167,7 @@ class ContextService(mitogen.service.Service):
del self._response_by_key[key]
del self._refs_by_context[context]
del self._key_by_context[context]
if lru:
if lru and context in lru:
lru.remove(context)
if new_context:
lru.append(new_context)
@ -256,7 +256,7 @@ class ContextService(mitogen.service.Service):
raise Error('unsupported method: %(transport)s' % spec)
context = method(via=via, **spec['kwargs'])
if via:
if via and spec.get('enable_lru'):
self._update_lru(context, spec, via)
else:
# For directly connected contexts, listen to the associated
@ -354,6 +354,17 @@ class ContextService(mitogen.service.Service):
return result
class StreamState(object):
def __init__(self):
#: List of [(Sender, file object)]
self.jobs = []
self.completing = {}
#: In-flight byte count.
self.unacked = 0
#: Lock.
self.lock = threading.Lock()
class FileService(mitogen.service.Service):
"""
Streaming file server, used to serve both small files like Ansible module
@ -420,131 +431,20 @@ class FileService(mitogen.service.Service):
handle = 501
max_message_size = 1000
unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
#: Maximum size of any stream's output queue before we temporarily stop
#: pumping more file chunks on that stream. The queue may overspill by up
#: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
#: Maximum size of any stream's output queue before we stop pumping more
#: file chunks. The queue may overspill by up to mitogen.core.CHUNK_SIZE-1
#: bytes (128KiB-1). With max_queue_size=1MiB and a RTT of 10ms, maximum
#: throughput is 112MiB/sec, which is >5x what SSH can handle on my laptop.
max_queue_size = 1048576
#: Time spent by the scheduler thread asleep when it has no more data to
#: pump, but while at least one transfer remains active. With
#: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any
#: single stream is 112MiB/sec, which is >5x what SSH can handle on my
#: laptop.
sleep_delay_secs = 0.01
def __init__(self, router):
super(FileService, self).__init__(router)
#: Mapping of registered path -> file size.
self._metadata_by_path = {}
#: Queue used to communicate from service to scheduler thread.
self._queue = mitogen.core.Latch()
#: Mapping of Stream->[(Sender, file object)].
self._pending_by_stream = {}
self._thread = threading.Thread(target=self._scheduler_main)
self._thread.start()
def on_shutdown(self):
"""
Respond to shutdown of the service pool by marking our queue closed.
This causes :meth:`_sleep_on_queue` to wake immediately and return
:data:`False`, causing the scheduler main thread to exit.
"""
self._queue.close()
def _pending_bytes(self, stream):
"""
Defer a function call to the Broker thread in order to accurately
measure the bytes pending in `stream`'s queue.
This must be done synchronized with the Broker, as OS scheduler
uncertainty could cause Sender.send()'s deferred enqueues to be
processed very late, making the output queue look much emptier than it
really is (or is about to become).
"""
latch = mitogen.core.Latch()
self.router.broker.defer(lambda: latch.put(stream.pending_bytes()))
return latch.get()
def _schedule_pending(self, stream, pending):
"""
Consider the pending file transfers for a single stream, pumping new
file chunks into the stream's queue while its size is below the
configured limit.
:param mitogen.core.Stream stream:
Stream to pump chunks for.
:param pending:
Corresponding list from :attr:`_pending_by_stream`.
"""
while pending and self._pending_bytes(stream) < self.max_queue_size:
sender, fp = pending[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
if s:
sender.send(s)
continue
# Empty read, indicating this file is fully transferred. Mark the
# sender closed (causing the corresponding Receiver loop in the
# target to exit), close the file handle, remove our entry from the
# pending list, and delete the stream's entry in the pending map if
# no more sends remain.
sender.close()
fp.close()
pending.pop(0)
if not pending:
del self._pending_by_stream[stream]
def _sleep_on_queue(self):
"""
Sleep indefinitely (no active transfers) or for
:attr:`sleep_delay_secs` (active transfers) waiting for a new transfer
request to arrive from the :meth:`fetch` method.
If a new request arrives, add it to the appropriate list in
:attr:`_pending_by_stream`.
:returns:
:data:`True` the scheduler's queue is still open,
:meth:`on_shutdown` hasn't been called yet, otherwise
:data:`False`.
"""
if self._pending_by_stream:
timeout = self.sleep_delay_secs
else:
timeout = None
try:
sender, fp = self._queue.get(timeout=timeout)
except mitogen.core.LatchError:
return False
except mitogen.core.TimeoutError:
return True
LOG.debug('%r._sleep_on_queue(): setting up %r for %r',
self, fp.name, sender)
stream = self.router.stream_by_id(sender.context.context_id)
pending = self._pending_by_stream.setdefault(stream, [])
pending.append((sender, fp))
return True
def _scheduler_main(self):
"""
Scheduler thread's main function. Sleep until
:meth:`_sleep_on_queue` indicates the queue has been shut down,
pumping pending file chunks each time we wake.
"""
while self._sleep_on_queue():
for stream, pending in list(self._pending_by_stream.items()):
self._schedule_pending(stream, pending)
# on_shutdown() has been called. Send close() on every sender to give
# targets a chance to shut down gracefully.
LOG.debug('%r._scheduler_main() shutting down', self)
for _, pending in self._pending_by_stream.items():
for sender, fp in pending:
sender.close()
fp.close()
#: Mapping of Stream->StreamState.
self._state_by_stream = {}
def _name_or_none(self, func, n, attr):
try:
@ -558,8 +458,8 @@ class FileService(mitogen.service.Service):
})
def register(self, path):
"""
Authorize a path for access by child contexts. Calling this repeatedly
with the same path is harmless.
Authorize a path for access by children. Repeat calls with the same
path is harmless.
:param str path:
File path.
@ -581,12 +481,51 @@ class FileService(mitogen.service.Service):
'atime': st.st_atime,
}
def on_shutdown(self):
"""
Respond to shutdown by sending close() to every target, allowing their
receive loop to exit and clean up gracefully.
"""
LOG.debug('%r.on_shutdown()', self)
for stream, state in self._state_by_stream.items():
state.lock.acquire()
try:
for sender, fp in reversed(state.jobs):
sender.close()
fp.close()
state.jobs.pop()
finally:
state.lock.release()
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`max_queue_size`. Must be
called with the StreamState lock held.
:param StreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.max_queue_size:
sender, fp = state.jobs[0]
s = fp.read(mitogen.core.CHUNK_SIZE)
state.unacked += len(s)
sender.send(s)
if not s:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
sender.close()
fp.close()
state.jobs.pop(0)
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({
'path': basestring,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender):
def fetch(self, path, sender, msg):
"""
Fetch a file's data.
@ -603,13 +542,52 @@ class FileService(mitogen.service.Service):
* ``group``: Owner group name on host machine.
* ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time.
:raises mitogen.core.CallError:
The path was not registered.
:raises Error:
Unregistered path, or attempt to send to context that was not the
requestee context.
"""
if path not in self._metadata_by_path:
raise mitogen.core.CallError(self.unregistered_msg)
raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path)
fp = open(path, 'rb', mitogen.core.CHUNK_SIZE)
self._queue.put((sender, fp))
return self._metadata_by_path[path]
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
msg.reply(self._metadata_by_path[path])
stream = self.router.stream_by_id(sender.context.context_id)
state = self._state_by_stream.setdefault(stream, StreamState())
state.lock.acquire()
try:
state.jobs.append((sender, fp))
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({
'size': int,
})
@mitogen.service.no_reply()
def acknowledge(self, size, msg):
"""
Acknowledgement bytes received by a transfer target, scheduling new
chunks to keep the window full. This should be called for every chunk
received by the target.
"""
stream = self.router.stream_by_id(msg.src_id)
state = self._state_by_stream[stream]
state.lock.acquire()
try:
if state.unacked < size:
LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d',
self, msg.src_id, state.unacked, size)
state.unacked -= min(state.unacked, size)
self._schedule_pending_unlocked(state)
finally:
state.lock.release()

@ -98,6 +98,14 @@ def _get_file(context, path, out_fp):
for chunk in recv:
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
mitogen.service.call_async(
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='acknowledge',
kwargs={
'size': len(s),
}
).close()
out_fp.write(s)
ok = out_fp.tell() == metadata['size']

@ -205,18 +205,10 @@ container.
* Delegated connection setup is single-threaded; only one connection can be
constructed in parallel per intermediary.
* Unbounded queue RAM growth may occur in an intermediary during large file
transfers if the link between any two hops is slower than the link
between the controller and the first hop.
* Inferring the configuration of intermediaries may be buggy, manifesting
as duplicate connections between hops, due to not perfectly replicating
the configuration Ansible would normally use for the intermediary.
* The extension does not understand the difference between a delegated
connection and a ``become_user``. If interpreter recycling kicks in, a
delegated connection could be prematurely recycled.
To enable connection delegation, set ``mitogen_via=<inventory name>`` on the
command line, or as host and group variables.

@ -470,6 +470,13 @@ Router Class
:return:
`handle`, or if `handle` was ``None``, the newly allocated handle.
.. method:: del_handler (handle)
Remove the handle registered for `handle`
:raises KeyError:
The handle wasn't registered.
.. method:: _async_route(msg, stream=None)
Arrange for `msg` to be forwarded towards its destination. If its
@ -917,6 +924,11 @@ Context Class
:param bool wait:
If :py:data:`True`, block the calling thread until the context has
completely terminated.
:returns:
If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch`
whose :meth:`get() <mitogen.core.Latch.get>` method returns
:data:`None` when shutdown completes. The `timeout` parameter may
be used to implement graceful timeouts.
.. method:: call_async (fn, \*args, \*\*kwargs)

@ -436,6 +436,9 @@ class Receiver(object):
self.notify(self)
def close(self):
if self.handle:
self.router.del_handler(self.handle)
self.handle = None
self._latch.put(Message.dead())
def empty(self):
@ -1273,6 +1276,9 @@ class Router(object):
self.broker.start_receive(stream)
listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream))
def del_handler(self, handle):
del self._handle_map[handle]
def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None):
handle = handle or self._last_handle.next()

@ -806,7 +806,8 @@ class Context(mitogen.core.Context):
if wait:
latch.get()
return latch
else:
return latch
class RouteMonitor(object):

@ -120,6 +120,18 @@ def expose(policy):
return wrapper
def no_reply():
"""
Annotate a method as one that does not generate a response. Messages sent
by the method are done so explicitly. This can be used for fire-and-forget
endpoints where the requestee never receives a reply.
"""
def wrapper(func):
func.mitogen_service__no_reply = True
return func
return wrapper
class Service(object):
#: Sentinel object to suppress reply generation, since returning ``None``
#: will trigger a response message containing the pickled ``None``.
@ -138,10 +150,7 @@ class Service(object):
self.running = True
def __repr__(self):
return '%s.%s()' % (
self.__class__.__module__,
self.__class__.__name__,
)
return '%s()' % (self.__class__.__name__,)
def on_shutdown(self):
"""
@ -182,7 +191,20 @@ class Service(object):
method = getattr(self, method_name)
if 'msg' in method.func_code.co_varnames:
kwargs['msg'] = msg # TODO: hack
return method(**kwargs)
no_reply = getattr(method, 'mitogen_service__no_reply', False)
ret = None
try:
ret = method(**kwargs)
if no_reply:
return self.NO_REPLY
return ret
except Exception:
if no_reply:
LOG.exception('While calling no-reply method %s.%s',
type(self).__name__, method.func_name)
else:
raise
def on_receive_message(self, msg):
try:
@ -347,10 +369,10 @@ class Pool(object):
)
def call_async(context, handle, method, kwargs):
def call_async(context, handle, method, kwargs=None):
LOG.debug('service.call_async(%r, %r, %r, %r)',
context, handle, method, kwargs)
pair = (method, kwargs)
pair = (method, kwargs or {})
msg = mitogen.core.Message.pickled(pair, handle=handle)
return context.send_async(msg)

Loading…
Cancel
Save