|
|
|
|
@ -347,27 +347,81 @@ class ContextService(mitogen.service.Service):
|
|
|
|
|
|
|
|
|
|
class FileService(mitogen.service.Service):
|
|
|
|
|
"""
|
|
|
|
|
Primitive latency-inducing file server for old-style incantations of the
|
|
|
|
|
module runner. This is to be replaced later with a scheme that forwards
|
|
|
|
|
files known to be missing without the target having to ask for them,
|
|
|
|
|
avoiding a corresponding roundtrip per file.
|
|
|
|
|
|
|
|
|
|
Paths must be explicitly added to the service by a trusted context before
|
|
|
|
|
they will be served to an untrusted context.
|
|
|
|
|
Streaming file server, used to serve both small files like Ansible module
|
|
|
|
|
sources, and huge files like ISO images. Paths must be explicitly added to
|
|
|
|
|
the service by a trusted context before they will be served to an untrusted
|
|
|
|
|
context.
|
|
|
|
|
|
|
|
|
|
The file service nominally lives on the mitogen.service.Pool() threads
|
|
|
|
|
shared with ContextService above, however for simplicity it also maintains
|
|
|
|
|
a dedicated thread from where file chunks are scheduled.
|
|
|
|
|
|
|
|
|
|
The scheduler thread is responsible for dividing transfer requests up among
|
|
|
|
|
the physical streams that connect to those contexts, and ensure each stream
|
|
|
|
|
never has an excessive amount of data buffered in RAM at any time.
|
|
|
|
|
|
|
|
|
|
Transfers proceeed one-at-a-time per stream. When multiple contexts exist
|
|
|
|
|
reachable over the same stream (e.g. one is the SSH account, another is a
|
|
|
|
|
sudo account, and a third is a proxied SSH connection), each request is
|
|
|
|
|
satisfied in turn before chunks for subsuquent requests start flowing. This
|
|
|
|
|
ensures when a connection is contended, that preference is given to
|
|
|
|
|
completing individual transfers, rather than potentially aborting many
|
|
|
|
|
partially complete transfers, causing all the bandwidth used to be wasted.
|
|
|
|
|
|
|
|
|
|
Theory of operation:
|
|
|
|
|
1. Trusted context (i.e. a WorkerProcess) calls register(), making a
|
|
|
|
|
file available to any untrusted context.
|
|
|
|
|
2. Untrusted context creates a mitogen.core.Receiver() to receive
|
|
|
|
|
file chunks. It then calls fetch(path, recv.to_sender()), which sets
|
|
|
|
|
up the transfer. The fetch() method returns the final file size and
|
|
|
|
|
notifies the dedicated thread of the transfer request.
|
|
|
|
|
3. The dedicated thread wakes from perpetual sleep, looks up the stream
|
|
|
|
|
used to communicate with the untrusted context, and begins pumping
|
|
|
|
|
128KiB-sized chunks until that stream's output queue reaches a
|
|
|
|
|
limit (1MiB).
|
|
|
|
|
4. The thread sleeps for 10ms, wakes, and pumps new chunks as necesarry
|
|
|
|
|
to refill any drained output queue, which are being asynchronously
|
|
|
|
|
drained by the Stream implementation running on the Broker thread.
|
|
|
|
|
5. Once the last chunk has been pumped for a single transfer,
|
|
|
|
|
Sender.close() is called causing the receive loop in
|
|
|
|
|
target.py::_get_file() to exit, and allows that code to compare the
|
|
|
|
|
transferred size with the total file size indicated by the return
|
|
|
|
|
value of the fetch() method.
|
|
|
|
|
6. If the sizes mismatch, the caller is informed, which will discard
|
|
|
|
|
the result and log an error.
|
|
|
|
|
7. Once all chunks have been pumped for all transfers, the dedicated
|
|
|
|
|
thread stops waking at 10ms intervals and resumes perpetual sleep.
|
|
|
|
|
|
|
|
|
|
Shutdown:
|
|
|
|
|
1. process.py calls service.Pool.shutdown(), which arranges for all the
|
|
|
|
|
service pool threads to exit and be joined, guranteeing no new
|
|
|
|
|
requests can arrive, before calling Service.on_shutdown() for each
|
|
|
|
|
registered service.
|
|
|
|
|
2. FileService.on_shutdown() marks the dedicated thread's queue as
|
|
|
|
|
closed, causing the dedicated thread to wake immediately. It will
|
|
|
|
|
throw an exception that begins shutdown of the main loop.
|
|
|
|
|
3. The main loop calls Sender.close() prematurely for every pending
|
|
|
|
|
transfer, causing any Receiver loops in the target contexts to exit
|
|
|
|
|
early. The file size check fails, and the partially downloaded file
|
|
|
|
|
is discarded, and an error is logged.
|
|
|
|
|
4. Control exits the file transfer function in every target, and
|
|
|
|
|
graceful target shutdown can proceed normally, without the
|
|
|
|
|
associated thread needing to be forcefully killed.
|
|
|
|
|
"""
|
|
|
|
|
handle = 501
|
|
|
|
|
max_message_size = 1000
|
|
|
|
|
unregistered_msg = 'Path is not registered with FileService.'
|
|
|
|
|
|
|
|
|
|
#: Maximum size of any stream's output queue before we temporarily stop
|
|
|
|
|
#: pumping more file chunks. The queue may overspill by up to
|
|
|
|
|
#: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
|
|
|
|
|
#: pumping more file chunks on that stream. The queue may overspill by up
|
|
|
|
|
#: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1).
|
|
|
|
|
max_queue_size = 1048576
|
|
|
|
|
|
|
|
|
|
#: Time spent by the scheduler thread asleep when it has no more queues to
|
|
|
|
|
#: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput
|
|
|
|
|
#: on any single stream is 100MiB/sec, which is 5x what SSH can handle on
|
|
|
|
|
#: my laptop.
|
|
|
|
|
#: Time spent by the scheduler thread asleep when it has no more data to
|
|
|
|
|
#: pump, but while at least one transfer remains active. With
|
|
|
|
|
#: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any
|
|
|
|
|
#: single stream is 100MiB/sec, which is 5x what SSH can handle on my
|
|
|
|
|
#: laptop.
|
|
|
|
|
sleep_delay_ms = 0.01
|
|
|
|
|
|
|
|
|
|
def __init__(self, router):
|
|
|
|
|
@ -385,7 +439,7 @@ class FileService(mitogen.service.Service):
|
|
|
|
|
"""
|
|
|
|
|
Respond to shutdown of the service pool by marking our queue closed.
|
|
|
|
|
This causes :meth:`_sleep_on_queue` to wake immediately and return
|
|
|
|
|
:data:`False`, causing the scheduler thread main function to exit.
|
|
|
|
|
:data:`False`, causing the scheduler main thread to exit.
|
|
|
|
|
"""
|
|
|
|
|
self._queue.close()
|
|
|
|
|
|
|
|
|
|
|