ansible: move FileService into mitogen.service.

pull/262/head
David Wilson 6 years ago
parent 9cb3878f3f
commit d9087c510b

@ -613,7 +613,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
utimes=(st.st_atime, st.st_mtime))
self.parent.call_service(
service_name='ansible_mitogen.services.FileService',
service_name='mitogen.service.FileService',
method_name='register',
path=mitogen.utils.cast(in_path)
)

@ -182,7 +182,7 @@ class BinaryPlanner(Planner):
def _grant_file_service_access(self, invocation):
invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.FileService',
service_name='mitogen.service.FileService',
method_name='register',
path=invocation.module_path,
)

@ -151,7 +151,7 @@ class MuxProcess(object):
Construct a ContextService and a thread to service requests for it
arriving from worker processes.
"""
file_service = ansible_mitogen.services.FileService(router=self.router)
file_service = mitogen.service.FileService(router=self.router)
self.pool = mitogen.service.Pool(
router=self.router,
services=[

@ -272,7 +272,7 @@ class ProgramRunner(Runner):
:param str path:
Absolute path to the program file on the master, as it can be retrieved
via :class:`ansible_mitogen.services.FileService`.
via :class:`mitogen.service.FileService`.
:param bool emulate_tty:
If :data:`True`, execute the program with `stdout` and `stderr` merged
into a single pipe, emulating Ansible behaviour when an SSH TTY is in

@ -38,15 +38,11 @@ when a child has completed a job.
"""
from __future__ import absolute_import
import grp
import logging
import os
import os.path
import pwd
import stat
import sys
import threading
import zlib
import mitogen
import mitogen.service
@ -367,252 +363,6 @@ class ContextService(mitogen.service.Service):
return result
class StreamState(object):
def __init__(self):
#: List of [(Sender, file object)]
self.jobs = []
self.completing = {}
#: In-flight byte count.
self.unacked = 0
#: Lock.
self.lock = threading.Lock()
class FileService(mitogen.service.Service):
"""
Streaming file server, used to serve small files like Ansible modules and
huge files like ISO images. Paths must be registered by a trusted context
before they will be served to a child.
Transfers are divided among the physical streams that connect external
contexts, ensuring each stream never has excessive data buffered in RAM,
while still maintaining enough to fully utilize available bandwidth. This
is achieved by making an initial bandwidth assumption, enqueueing enough
chunks to fill that assumed pipe, then responding to delivery
acknowledgements from the receiver by scheduling new chunks.
Transfers proceed one-at-a-time per stream. When multiple contexts exist on
a stream (e.g. one is the SSH account, another is a sudo account, and a
third is a proxied SSH connection), each request is satisfied in turn
before subsequent requests start flowing. This ensures when a stream is
contended, priority is given to completing individual transfers rather than
potentially aborting many partial transfers, causing the bandwidth to be
wasted.
Theory of operation:
1. Trusted context (i.e. WorkerProcess) calls register(), making a
file available to any untrusted context.
2. Requestee context creates a mitogen.core.Receiver() to receive
chunks, then calls fetch(path, recv.to_sender()), to set up the
transfer.
3. fetch() replies to the call with the file's metadata, then
schedules an initial burst up to the window size limit (1MiB).
4. Chunks begin to arrive in the requestee, which calls acknowledge()
for each 128KiB received.
5. The acknowledge() call arrives at FileService, which scheduled a new
chunk to refill the drained window back to the size limit.
6. When the last chunk has been pumped for a single transfer,
Sender.close() is called causing the receive loop in
target.py::_get_file() to exit, allowing that code to compare the
transferred size with the total file size from the metadata.
7. If the sizes mismatch, _get_file()'s caller is informed which will
discard the result and log/raise an error.
Shutdown:
1. process.py calls service.Pool.shutdown(), which arranges for the
service pool threads to exit and be joined, guranteeing no new
requests can arrive, before calling Service.on_shutdown() for each
registered service.
2. FileService.on_shutdown() walks every in-progress transfer and calls
Sender.close(), causing Receiver loops in the requestees to exit
early. The size check fails and any partially downloaded file is
discarded.
3. Control exits _get_file() in every target, and graceful shutdown can
proceed normally, without the associated thread needing to be
forcefully killed.
"""
unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
#: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which
#: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5.
window_size_bytes = 1048576
def __init__(self, router):
super(FileService, self).__init__(router)
#: Mapping of registered path -> file size.
self._metadata_by_path = {}
#: Mapping of Stream->StreamState.
self._state_by_stream = {}
def _name_or_none(self, func, n, attr):
try:
return getattr(func(n), attr)
except KeyError:
return None
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'paths': list
})
def register_many(self, paths):
"""
Batch version of register().
"""
for path in paths:
self.register(path)
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'path': basestring
})
def register(self, path):
"""
Authorize a path for access by children. Repeat calls with the same
path is harmless.
:param str path:
File path.
"""
if path in self._metadata_by_path:
return
st = os.stat(path)
if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (in_path,))
LOG.debug('%r: registering %r', self, path)
self._metadata_by_path[path] = {
'size': st.st_size,
'mode': st.st_mode,
'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'),
'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'),
'mtime': st.st_mtime,
'atime': st.st_atime,
}
def on_shutdown(self):
"""
Respond to shutdown by sending close() to every target, allowing their
receive loop to exit and clean up gracefully.
"""
LOG.debug('%r.on_shutdown()', self)
for stream, state in self._state_by_stream.items():
state.lock.acquire()
try:
for sender, fp in reversed(state.jobs):
sender.close()
fp.close()
state.jobs.pop()
finally:
state.lock.release()
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`window_size_bytes`. Must
be called with the StreamState lock held.
:param StreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.window_size_bytes:
sender, fp = state.jobs[0]
s = fp.read(self.IO_SIZE)
if s:
state.unacked += len(s)
sender.send(mitogen.core.Blob(s))
else:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
sender.close()
fp.close()
state.jobs.pop(0)
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({
'path': basestring,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender, msg):
"""
Start a transfer for a registered path.
:param str path:
File path.
:param mitogen.core.Sender sender:
Sender to receive file data.
:returns:
Dict containing the file metadata:
* ``size``: File size in bytes.
* ``mode``: Integer file mode.
* ``owner``: Owner account name on host machine.
* ``group``: Owner group name on host machine.
* ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time.
:raises Error:
Unregistered path, or Sender did not match requestee context.
"""
if path not in self._metadata_by_path:
raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path)
fp = open(path, 'rb', self.IO_SIZE)
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
msg.reply(self._metadata_by_path[path])
stream = self.router.stream_by_id(sender.context.context_id)
state = self._state_by_stream.setdefault(stream, StreamState())
state.lock.acquire()
try:
state.jobs.append((sender, fp))
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.no_reply()
@mitogen.service.arg_spec({
'size': int,
})
@mitogen.service.no_reply()
def acknowledge(self, size, msg):
"""
Acknowledge bytes received by a transfer target, scheduling new chunks
to keep the window full. This should be called for every chunk received
by the target.
"""
stream = self.router.stream_by_id(msg.src_id)
state = self._state_by_stream[stream]
state.lock.acquire()
try:
if state.unacked < size:
LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d',
self, msg.src_id, state.unacked, size)
state.unacked -= min(state.unacked, size)
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
class ModuleDepService(mitogen.service.Service):
"""
Scan a new-style module and produce a cached mapping of module_utils names

@ -89,7 +89,7 @@ def _get_file(context, path, out_fp):
t0 = time.time()
recv = mitogen.core.Receiver(router=context.router)
metadata = context.call_service(
service_name='ansible_mitogen.services.FileService',
service_name='mitogen.service.FileService',
method_name='fetch',
path=path,
sender=recv.to_sender(),
@ -99,7 +99,7 @@ def _get_file(context, path, out_fp):
s = chunk.unpickle()
LOG.debug('_get_file(%r): received %d bytes', path, len(s))
context.call_service_async(
service_name='ansible_mitogen.services.FileService',
service_name='mitogen.service.FileService',
method_name='acknowledge',
size=len(s),
).close()

@ -26,7 +26,12 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import grp
import os
import os.path
import pprint
import pwd
import stat
import sys
import threading
@ -489,3 +494,238 @@ class Pool(object):
len(self._threads),
th.name,
)
class FileStreamState(object):
def __init__(self):
#: List of [(Sender, file object)]
self.jobs = []
self.completing = {}
#: In-flight byte count.
self.unacked = 0
#: Lock.
self.lock = threading.Lock()
class FileService(Service):
"""
Streaming file server, used to serve small files and huge files alike.
Paths must be registered by a trusted context before they will be served to
a child.
Transfers are divided among the physical streams that connect external
contexts, ensuring each stream never has excessive data buffered in RAM,
while still maintaining enough to fully utilize available bandwidth. This
is achieved by making an initial bandwidth assumption, enqueueing enough
chunks to fill that assumed pipe, then responding to delivery
acknowledgements from the receiver by scheduling new chunks.
Transfers proceed one-at-a-time per stream. When multiple contexts exist on
a stream (e.g. one is the SSH account, another is a sudo account, and a
third is a proxied SSH connection), each request is satisfied in turn
before subsequent requests start flowing. This ensures when a stream is
contended, priority is given to completing individual transfers rather than
potentially aborting many partial transfers, causing the bandwidth to be
wasted.
Theory of operation:
1. Trusted context (i.e. WorkerProcess) calls register(), making a
file available to any untrusted context.
2. Requestee context creates a mitogen.core.Receiver() to receive
chunks, then calls fetch(path, recv.to_sender()), to set up the
transfer.
3. fetch() replies to the call with the file's metadata, then
schedules an initial burst up to the window size limit (1MiB).
4. Chunks begin to arrive in the requestee, which calls acknowledge()
for each 128KiB received.
5. The acknowledge() call arrives at FileService, which scheduled a new
chunk to refill the drained window back to the size limit.
6. When the last chunk has been pumped for a single transfer,
Sender.close() is called causing the receive loop in
target.py::_get_file() to exit, allowing that code to compare the
transferred size with the total file size from the metadata.
7. If the sizes mismatch, _get_file()'s caller is informed which will
discard the result and log/raise an error.
Shutdown:
1. process.py calls service.Pool.shutdown(), which arranges for the
service pool threads to exit and be joined, guranteeing no new
requests can arrive, before calling Service.on_shutdown() for each
registered service.
2. FileService.on_shutdown() walks every in-progress transfer and calls
Sender.close(), causing Receiver loops in the requestees to exit
early. The size check fails and any partially downloaded file is
discarded.
3. Control exits _get_file() in every target, and graceful shutdown can
proceed normally, without the associated thread needing to be
forcefully killed.
"""
unregistered_msg = 'Path is not registered with FileService.'
context_mismatch_msg = 'sender= kwarg context must match requestee context'
#: Burst size. With 1MiB and 10ms RTT max throughput is 100MiB/sec, which
#: is 5x what SSH can handle on a 2011 era 2.4Ghz Core i5.
window_size_bytes = 1048576
def __init__(self, router):
super(FileService, self).__init__(router)
#: Mapping of registered path -> file size.
self._metadata_by_path = {}
#: Mapping of Stream->FileStreamState.
self._state_by_stream = {}
def _name_or_none(self, func, n, attr):
try:
return getattr(func(n), attr)
except KeyError:
return None
@expose(policy=AllowParents())
@arg_spec({
'path': basestring,
})
def register(self, path):
"""
Authorize a path for access by children. Repeat calls with the same
path is harmless.
:param str path:
File path.
"""
if path in self._metadata_by_path:
return
st = os.stat(path)
if not stat.S_ISREG(st.st_mode):
raise IOError('%r is not a regular file.' % (in_path,))
LOG.debug('%r: registering %r', self, path)
self._metadata_by_path[path] = {
'size': st.st_size,
'mode': st.st_mode,
'owner': self._name_or_none(pwd.getpwuid, 0, 'pw_name'),
'group': self._name_or_none(grp.getgrgid, 0, 'gr_name'),
'mtime': st.st_mtime,
'atime': st.st_atime,
}
def on_shutdown(self):
"""
Respond to shutdown by sending close() to every target, allowing their
receive loop to exit and clean up gracefully.
"""
LOG.debug('%r.on_shutdown()', self)
for stream, state in self._state_by_stream.items():
state.lock.acquire()
try:
for sender, fp in reversed(state.jobs):
sender.close()
fp.close()
state.jobs.pop()
finally:
state.lock.release()
# The IO loop pumps 128KiB chunks. An ideal message is a multiple of this,
# odd-sized messages waste one tiny write() per message on the trailer.
# Therefore subtract 10 bytes pickle overhead + 24 bytes header.
IO_SIZE = mitogen.core.CHUNK_SIZE - (mitogen.core.Stream.HEADER_LEN + (
len(
mitogen.core.Message.pickled(
mitogen.core.Blob(' ' * mitogen.core.CHUNK_SIZE)
).data
) - mitogen.core.CHUNK_SIZE
))
def _schedule_pending_unlocked(self, state):
"""
Consider the pending transfers for a stream, pumping new chunks while
the unacknowledged byte count is below :attr:`window_size_bytes`. Must
be called with the FileStreamState lock held.
:param FileStreamState state:
Stream to schedule chunks for.
"""
while state.jobs and state.unacked < self.window_size_bytes:
sender, fp = state.jobs[0]
s = fp.read(self.IO_SIZE)
if s:
state.unacked += len(s)
sender.send(mitogen.core.Blob(s))
else:
# File is done. Cause the target's receive loop to exit by
# closing the sender, close the file, and remove the job entry.
sender.close()
fp.close()
state.jobs.pop(0)
@expose(policy=AllowAny())
@no_reply()
@arg_spec({
'path': basestring,
'sender': mitogen.core.Sender,
})
def fetch(self, path, sender, msg):
"""
Start a transfer for a registered path.
:param str path:
File path.
:param mitogen.core.Sender sender:
Sender to receive file data.
:returns:
Dict containing the file metadata:
* ``size``: File size in bytes.
* ``mode``: Integer file mode.
* ``owner``: Owner account name on host machine.
* ``group``: Owner group name on host machine.
* ``mtime``: Floating point modification time.
* ``ctime``: Floating point change time.
:raises Error:
Unregistered path, or Sender did not match requestee context.
"""
if path not in self._metadata_by_path:
raise Error(self.unregistered_msg)
if msg.src_id != sender.context.context_id:
raise Error(self.context_mismatch_msg)
LOG.debug('Serving %r', path)
fp = open(path, 'rb', self.IO_SIZE)
# Response must arrive first so requestee can begin receive loop,
# otherwise first ack won't arrive until all pending chunks were
# delivered. In that case max BDP would always be 128KiB, aka. max
# ~10Mbit/sec over a 100ms link.
msg.reply(self._metadata_by_path[path])
stream = self.router.stream_by_id(sender.context.context_id)
state = self._state_by_stream.setdefault(stream, FileStreamState())
state.lock.acquire()
try:
state.jobs.append((sender, fp))
self._schedule_pending_unlocked(state)
finally:
state.lock.release()
@expose(policy=AllowAny())
@no_reply()
@arg_spec({
'size': int,
})
@no_reply()
def acknowledge(self, size, msg):
"""
Acknowledge bytes received by a transfer target, scheduling new chunks
to keep the window full. This should be called for every chunk received
by the target.
"""
stream = self.router.stream_by_id(msg.src_id)
state = self._state_by_stream[stream]
state.lock.acquire()
try:
if state.unacked < size:
LOG.error('%r.acknowledge(src_id %d): unacked=%d < size %d',
self, msg.src_id, state.unacked, size)
state.unacked -= min(state.unacked, size)
self._schedule_pending_unlocked(state)
finally:
state.lock.release()

Loading…
Cancel
Save