|
|
@ -367,77 +367,66 @@ class StreamState(object):
|
|
|
|
|
|
|
|
|
|
|
|
class FileService(mitogen.service.Service):
|
|
|
|
class FileService(mitogen.service.Service):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Streaming file server, used to serve both small files like Ansible module
|
|
|
|
Streaming file server, used to serve small files like Ansible modules and
|
|
|
|
sources, and huge files like ISO images. Paths must be explicitly added to
|
|
|
|
huge files like ISO images. Paths must be registered by a trusted context
|
|
|
|
the service by a trusted context before they will be served to an untrusted
|
|
|
|
before they will be served to a child.
|
|
|
|
context.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The file service nominally lives on the mitogen.service.Pool() threads
|
|
|
|
Transfers are divided among the physical streams that connect external
|
|
|
|
shared with ContextService above, however for simplicity it also maintains
|
|
|
|
contexts, ensuring each stream never has excessive data buffered in RAM,
|
|
|
|
a dedicated thread from where file chunks are scheduled.
|
|
|
|
while still maintaining enough to fully utilize available bandwidth. This
|
|
|
|
|
|
|
|
is achieved by making an initial bandwidth assumption, enqueueing enough
|
|
|
|
The scheduler thread is responsible for dividing transfer requests up among
|
|
|
|
chunks to fill that assumed pipe, then responding to delivery
|
|
|
|
the physical streams that connect to those contexts, and ensure each stream
|
|
|
|
acknowledgements from the receiver by scheduling new chunks.
|
|
|
|
never has an excessive amount of data buffered in RAM at any time.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Transfers proceeed one-at-a-time per stream. When multiple contexts exist
|
|
|
|
Transfers proceeed one-at-a-time per stream. When multiple contexts exist
|
|
|
|
reachable over the same stream (e.g. one is the SSH account, another is a
|
|
|
|
on a stream (e.g. one is the SSH account, another is a sudo account, and a
|
|
|
|
sudo account, and a third is a proxied SSH connection), each request is
|
|
|
|
third is a proxied SSH connection), each request is satisfied in turn
|
|
|
|
satisfied in turn before chunks for subsequent requests start flowing. This
|
|
|
|
before subsequent requests start flowing. This ensures when a stream is
|
|
|
|
ensures when a connection is contended, that preference is given to
|
|
|
|
contended, priority is given to completing individual transfers rather than
|
|
|
|
completing individual transfers, rather than potentially aborting many
|
|
|
|
potentially aborting many partial transfers, causing the bandwidth to be
|
|
|
|
partially complete transfers, causing all the bandwidth used to be wasted.
|
|
|
|
wasted.
|
|
|
|
|
|
|
|
|
|
|
|
Theory of operation:
|
|
|
|
Theory of operation:
|
|
|
|
1. Trusted context (i.e. a WorkerProcess) calls register(), making a
|
|
|
|
1. Trusted context (i.e. WorkerProcess) calls register(), making a
|
|
|
|
file available to any untrusted context.
|
|
|
|
file available to any untrusted context.
|
|
|
|
2. Untrusted context creates a mitogen.core.Receiver() to receive
|
|
|
|
2. Requestee context creates a mitogen.core.Receiver() to receive
|
|
|
|
file chunks. It then calls fetch(path, recv.to_sender()), which sets
|
|
|
|
chunks, then calls fetch(path, recv.to_sender()), to set up the
|
|
|
|
up the transfer. The fetch() method returns the final file size and
|
|
|
|
transfer.
|
|
|
|
notifies the dedicated thread of the transfer request.
|
|
|
|
3. fetch() replies to the call with the file's metadata, then
|
|
|
|
3. The dedicated thread wakes from perpetual sleep, looks up the stream
|
|
|
|
schedules an initial burst up to the window size limit (1MiB).
|
|
|
|
used to communicate with the untrusted context, and begins pumping
|
|
|
|
4. Chunks begin to arrive in the requestee, which calls acknowledge()
|
|
|
|
128KiB-sized chunks until that stream's output queue reaches a
|
|
|
|
for each 128KiB received.
|
|
|
|
limit (1MiB).
|
|
|
|
5. The acknowledge() call arrives at FileService, which scheduled a new
|
|
|
|
4. The thread sleeps for 10ms, wakes, and pumps new chunks as necessary
|
|
|
|
chunk to refill the drained window back to the size limit.
|
|
|
|
to refill any drained output queue, which are being asynchronously
|
|
|
|
6. When the last chunk has been pumped for a single transfer,
|
|
|
|
drained by the Stream implementation running on the Broker thread.
|
|
|
|
|
|
|
|
5. Once the last chunk has been pumped for a single transfer,
|
|
|
|
|
|
|
|
Sender.close() is called causing the receive loop in
|
|
|
|
Sender.close() is called causing the receive loop in
|
|
|
|
target.py::_get_file() to exit, and allows that code to compare the
|
|
|
|
target.py::_get_file() to exit, allowing that code to compare the
|
|
|
|
transferred size with the total file size indicated by the return
|
|
|
|
transferred size with the total file size from the metadata.
|
|
|
|
value of the fetch() method.
|
|
|
|
7. If the sizes mismatch, _get_file()'s caller is informed which will
|
|
|
|
6. If the sizes mismatch, the caller is informed, which will discard
|
|
|
|
discard the result and log/raise an error.
|
|
|
|
the result and log an error.
|
|
|
|
|
|
|
|
7. Once all chunks have been pumped for all transfers, the dedicated
|
|
|
|
|
|
|
|
thread stops waking at 10ms intervals and resumes perpetual sleep.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Shutdown:
|
|
|
|
Shutdown:
|
|
|
|
1. process.py calls service.Pool.shutdown(), which arranges for all the
|
|
|
|
1. process.py calls service.Pool.shutdown(), which arranges for the
|
|
|
|
service pool threads to exit and be joined, guranteeing no new
|
|
|
|
service pool threads to exit and be joined, guranteeing no new
|
|
|
|
requests can arrive, before calling Service.on_shutdown() for each
|
|
|
|
requests can arrive, before calling Service.on_shutdown() for each
|
|
|
|
registered service.
|
|
|
|
registered service.
|
|
|
|
2. FileService.on_shutdown() marks the dedicated thread's queue as
|
|
|
|
2. FileService.on_shutdown() walks every in-progress transfer and calls
|
|
|
|
closed, causing the dedicated thread to wake immediately. It will
|
|
|
|
Sender.close(), causing Receiver loops in the requestees to exit
|
|
|
|
throw an exception that begins shutdown of the main loop.
|
|
|
|
early. The size check fails and any partially downloaded file is
|
|
|
|
3. The main loop calls Sender.close() prematurely for every pending
|
|
|
|
discarded.
|
|
|
|
transfer, causing any Receiver loops in the target contexts to exit
|
|
|
|
3. Control exits _get_file() in every target, and graceful shutdown can
|
|
|
|
early. The file size check fails, and the partially downloaded file
|
|
|
|
proceed normally, without the associated thread needing to be
|
|
|
|
is discarded, and an error is logged.
|
|
|
|
forcefully killed.
|
|
|
|
4. Control exits the file transfer function in every target, and
|
|
|
|
|
|
|
|
graceful target shutdown can proceed normally, without the
|
|
|
|
|
|
|
|
associated thread needing to be forcefully killed.
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
handle = 501
|
|
|
|
handle = 501
|
|
|
|
max_message_size = 1000
|
|
|
|
max_message_size = 1000
|
|
|
|
unregistered_msg = 'Path is not registered with FileService.'
|
|
|
|
unregistered_msg = 'Path is not registered with FileService.'
|
|
|
|
context_mismatch_msg = 'sender= kwarg context must match requestee context'
|
|
|
|
context_mismatch_msg = 'sender= kwarg context must match requestee context'
|
|
|
|
|
|
|
|
|
|
|
|
#: Maximum size of any stream's output queue before we stop pumping more
|
|
|
|
#: Initial burst size. With 1MiB and a RTT of 10ms, maximum throughput is
|
|
|
|
#: file chunks. The queue may overspill by up to mitogen.core.CHUNK_SIZE-1
|
|
|
|
#: 112MiB/sec, which is 5x what SSH can handle on a 2011 era 2.4Ghz Core
|
|
|
|
#: bytes (128KiB-1). With max_queue_size=1MiB and a RTT of 10ms, maximum
|
|
|
|
#: i5.
|
|
|
|
#: throughput is 112MiB/sec, which is >5x what SSH can handle on my laptop.
|
|
|
|
window_size_bytes = 1048576
|
|
|
|
max_queue_size = 1048576
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, router):
|
|
|
|
def __init__(self, router):
|
|
|
|
super(FileService, self).__init__(router)
|
|
|
|
super(FileService, self).__init__(router)
|
|
|
@ -500,13 +489,13 @@ class FileService(mitogen.service.Service):
|
|
|
|
def _schedule_pending_unlocked(self, state):
|
|
|
|
def _schedule_pending_unlocked(self, state):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Consider the pending transfers for a stream, pumping new chunks while
|
|
|
|
Consider the pending transfers for a stream, pumping new chunks while
|
|
|
|
the unacknowledged byte count is below :attr:`max_queue_size`. Must be
|
|
|
|
the unacknowledged byte count is below :attr:`window_size_bytes`. Must
|
|
|
|
called with the StreamState lock held.
|
|
|
|
be called with the StreamState lock held.
|
|
|
|
|
|
|
|
|
|
|
|
:param StreamState state:
|
|
|
|
:param StreamState state:
|
|
|
|
Stream to schedule chunks for.
|
|
|
|
Stream to schedule chunks for.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
while state.jobs and state.unacked < self.max_queue_size:
|
|
|
|
while state.jobs and state.unacked < self.window_size_bytes:
|
|
|
|
sender, fp = state.jobs[0]
|
|
|
|
sender, fp = state.jobs[0]
|
|
|
|
s = fp.read(mitogen.core.CHUNK_SIZE)
|
|
|
|
s = fp.read(mitogen.core.CHUNK_SIZE)
|
|
|
|
state.unacked += len(s)
|
|
|
|
state.unacked += len(s)
|
|
|
@ -576,9 +565,9 @@ class FileService(mitogen.service.Service):
|
|
|
|
@mitogen.service.no_reply()
|
|
|
|
@mitogen.service.no_reply()
|
|
|
|
def acknowledge(self, size, msg):
|
|
|
|
def acknowledge(self, size, msg):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Acknowledgement bytes received by a transfer target, scheduling new
|
|
|
|
Acknowledge bytes received by a transfer target, scheduling new chunks
|
|
|
|
chunks to keep the window full. This should be called for every chunk
|
|
|
|
to keep the window full. This should be called for every chunk received
|
|
|
|
received by the target.
|
|
|
|
by the target.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
stream = self.router.stream_by_id(msg.src_id)
|
|
|
|
stream = self.router.stream_by_id(msg.src_id)
|
|
|
|
state = self._state_by_stream[stream]
|
|
|
|
state = self._state_by_stream[stream]
|
|
|
|