diff --git a/.travis.yml b/.travis.yml index e25cf813..47c64a35 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,11 +12,11 @@ python: env: - MODE=mitogen MITOGEN_TEST_DISTRO=debian - MODE=mitogen MITOGEN_TEST_DISTRO=centos -- MODE=debops_common +- MODE=debops_common ANSIBLE_VERSION=2.4.3.0 +- MODE=debops_common ANSIBLE_VERSION=2.5.1 - MODE=ansible ANSIBLE_VERSION=2.4.3.0 MITOGEN_TEST_DISTRO=debian -- MODE=ansible ANSIBLE_VERSION=2.4.3.0 MITOGEN_TEST_DISTRO=centos -- MODE=ansible ANSIBLE_VERSION=2.5.0 MITOGEN_TEST_DISTRO=centos -- MODE=ansible ANSIBLE_VERSION=2.5.0 MITOGEN_TEST_DISTRO=debian +- MODE=ansible ANSIBLE_VERSION=2.5.1 MITOGEN_TEST_DISTRO=centos +- MODE=ansible ANSIBLE_VERSION=2.5.1 MITOGEN_TEST_DISTRO=debian install: - pip install -r dev_requirements.txt diff --git a/.travis/debops_common_tests.sh b/.travis/debops_common_tests.sh index b75484c2..f0537a85 100755 --- a/.travis/debops_common_tests.sh +++ b/.travis/debops_common_tests.sh @@ -4,6 +4,7 @@ TMPDIR="/tmp/debops-$$" TRAVIS_BUILD_DIR="${TRAVIS_BUILD_DIR:-`pwd`}" TARGET_COUNT="${TARGET_COUNT:-2}" +ANSIBLE_VERSION="${ANSIBLE_VERSION:-2.4.3.0}" MITOGEN_TEST_DISTRO=debian # Naturally DebOps only supports Debian. export PYTHONPATH="${PYTHONPATH}:${TRAVIS_BUILD_DIR}" @@ -26,7 +27,7 @@ mkdir "$TMPDIR" echo travis_fold:start:job_setup -pip install -qqqU debops==0.7.2 ansible==2.4.3.0 +pip install -qqqU debops==0.7.2 ansible==${ANSIBLE_VERSION} debops-init "$TMPDIR/project" cd "$TMPDIR/project" diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index af3e2bf7..5b58116b 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -386,6 +386,8 @@ class NewStyleRunner(ScriptRunner): def _run(self): code = self._get_code() mod = types.ModuleType('__main__') + mod.__file__ = self.program_fp.name + mod.__package__ = None d = vars(mod) e = None diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index de7f8135..fee50934 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -347,27 +347,81 @@ class ContextService(mitogen.service.Service): class FileService(mitogen.service.Service): """ - Primitive latency-inducing file server for old-style incantations of the - module runner. This is to be replaced later with a scheme that forwards - files known to be missing without the target having to ask for them, - avoiding a corresponding roundtrip per file. - - Paths must be explicitly added to the service by a trusted context before - they will be served to an untrusted context. + Streaming file server, used to serve both small files like Ansible module + sources, and huge files like ISO images. Paths must be explicitly added to + the service by a trusted context before they will be served to an untrusted + context. + + The file service nominally lives on the mitogen.service.Pool() threads + shared with ContextService above, however for simplicity it also maintains + a dedicated thread from where file chunks are scheduled. + + The scheduler thread is responsible for dividing transfer requests up among + the physical streams that connect to those contexts, and ensure each stream + never has an excessive amount of data buffered in RAM at any time. + + Transfers proceeed one-at-a-time per stream. When multiple contexts exist + reachable over the same stream (e.g. one is the SSH account, another is a + sudo account, and a third is a proxied SSH connection), each request is + satisfied in turn before chunks for subsequent requests start flowing. This + ensures when a connection is contended, that preference is given to + completing individual transfers, rather than potentially aborting many + partially complete transfers, causing all the bandwidth used to be wasted. + + Theory of operation: + 1. Trusted context (i.e. a WorkerProcess) calls register(), making a + file available to any untrusted context. + 2. Untrusted context creates a mitogen.core.Receiver() to receive + file chunks. It then calls fetch(path, recv.to_sender()), which sets + up the transfer. The fetch() method returns the final file size and + notifies the dedicated thread of the transfer request. + 3. The dedicated thread wakes from perpetual sleep, looks up the stream + used to communicate with the untrusted context, and begins pumping + 128KiB-sized chunks until that stream's output queue reaches a + limit (1MiB). + 4. The thread sleeps for 10ms, wakes, and pumps new chunks as necessary + to refill any drained output queue, which are being asynchronously + drained by the Stream implementation running on the Broker thread. + 5. Once the last chunk has been pumped for a single transfer, + Sender.close() is called causing the receive loop in + target.py::_get_file() to exit, and allows that code to compare the + transferred size with the total file size indicated by the return + value of the fetch() method. + 6. If the sizes mismatch, the caller is informed, which will discard + the result and log an error. + 7. Once all chunks have been pumped for all transfers, the dedicated + thread stops waking at 10ms intervals and resumes perpetual sleep. + + Shutdown: + 1. process.py calls service.Pool.shutdown(), which arranges for all the + service pool threads to exit and be joined, guranteeing no new + requests can arrive, before calling Service.on_shutdown() for each + registered service. + 2. FileService.on_shutdown() marks the dedicated thread's queue as + closed, causing the dedicated thread to wake immediately. It will + throw an exception that begins shutdown of the main loop. + 3. The main loop calls Sender.close() prematurely for every pending + transfer, causing any Receiver loops in the target contexts to exit + early. The file size check fails, and the partially downloaded file + is discarded, and an error is logged. + 4. Control exits the file transfer function in every target, and + graceful target shutdown can proceed normally, without the + associated thread needing to be forcefully killed. """ handle = 501 max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' #: Maximum size of any stream's output queue before we temporarily stop - #: pumping more file chunks. The queue may overspill by up to - #: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). + #: pumping more file chunks on that stream. The queue may overspill by up + #: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). max_queue_size = 1048576 - #: Time spent by the scheduler thread asleep when it has no more queues to - #: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput - #: on any single stream is 100MiB/sec, which is 5x what SSH can handle on - #: my laptop. + #: Time spent by the scheduler thread asleep when it has no more data to + #: pump, but while at least one transfer remains active. With + #: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any + #: single stream is 100MiB/sec, which is 5x what SSH can handle on my + #: laptop. sleep_delay_ms = 0.01 def __init__(self, router): @@ -381,12 +435,20 @@ class FileService(mitogen.service.Service): self._thread = threading.Thread(target=self._scheduler_main) self._thread.start() + def on_shutdown(self): + """ + Respond to shutdown of the service pool by marking our queue closed. + This causes :meth:`_sleep_on_queue` to wake immediately and return + :data:`False`, causing the scheduler main thread to exit. + """ + self._queue.close() + def _pending_bytes(self, stream): """ Defer a function call to the Broker thread in order to accurately measure the bytes pending in `stream`'s queue. - This must be done synchronized with the Broker, as scheduler + This must be done synchronized with the Broker, as OS scheduler uncertainty could cause Sender.send()'s deferred enqueues to be processed very late, making the output queue look much emptier than it really is (or is about to become). @@ -438,11 +500,12 @@ class FileService(mitogen.service.Service): :meth:`on_shutdown` hasn't been called yet, otherwise :data:`False`. """ + if self._schedule_pending: + timeout = self.sleep_delay_ms + else: + timeout = None + try: - if self._schedule_pending: - timeout = self.sleep_delay_ms - else: - timeout = None sender, fp = self._queue.get(timeout=timeout) except mitogen.core.LatchError: return False @@ -513,8 +576,6 @@ class FileService(mitogen.service.Service): raise mitogen.core.CallError(self.unregistered_msg) LOG.debug('Serving %r', path) - self._queue.put(( - sender, - open(path, 'rb', mitogen.core.CHUNK_SIZE), - )) + fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) + self._queue.put((sender, fp)) return self._size_by_path[path] diff --git a/mitogen/core.py b/mitogen/core.py index c57f84cb..4a48dac0 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -441,10 +441,10 @@ class Receiver(object): def empty(self): return self._latch.empty() - def get(self, timeout=None, block=True): + def get(self, timeout=None, block=True, throw_dead=True): _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) msg = self._latch.get(timeout=timeout, block=block) - if msg.is_dead: + if msg.is_dead and throw_dead: if msg.src_id == mitogen.context_id: raise ChannelError(ChannelError.local_msg) else: diff --git a/tests/ansible/bench/all.yml b/tests/ansible/bench/all.yml new file mode 100644 index 00000000..453d0b42 --- /dev/null +++ b/tests/ansible/bench/all.yml @@ -0,0 +1 @@ +- import_playbook: file_transfer.yml diff --git a/tests/ansible/bench/file_transfer.yml b/tests/ansible/bench/file_transfer.yml new file mode 100644 index 00000000..99666b84 --- /dev/null +++ b/tests/ansible/bench/file_transfer.yml @@ -0,0 +1,68 @@ + +- name: bench/file_transfer.yml + hosts: all + any_errors_fatal: true + tasks: + + - name: Make 32MiB file + connection: local + shell: openssl rand 33554432 > /tmp/bigfile.in + + - name: Make 320MiB file + connection: local + shell: > + cat + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + > /tmp/bigbigfile.in + + - name: Delete SSH file is present. + file: + path: "{{item}}" + state: absent + become: true + with_items: + - /tmp/bigfile.out + - /tmp/bigbigfile.out + + - name: Copy 32MiB file via SSH + copy: + src: /tmp/bigfile.in + dest: /tmp/bigfile.out + + - name: Copy 320MiB file via SSH + copy: + src: /tmp/bigbigfile.in + dest: /tmp/bigbigfile.out + + - name: Delete localhost sudo file if present. + file: + path: "{{item}}" + state: absent + connection: local + become: true + with_items: + - /tmp/bigfile.out + - /tmp/bigbigfile.out + + - name: Copy 32MiB file via localhost sudo + connection: local + become: true + copy: + src: /tmp/bigfile.in + dest: /tmp/bigfile.out + + - name: Copy 320MiB file via localhost sudo + connection: local + become: true + copy: + src: /tmp/bigbigfile.in + dest: /tmp/bigbigfile.out diff --git a/tests/ansible/lib/modules/custom_python_new_style_module.py b/tests/ansible/lib/modules/custom_python_new_style_module.py index 1ae50d50..4bc9794d 100755 --- a/tests/ansible/lib/modules/custom_python_new_style_module.py +++ b/tests/ansible/lib/modules/custom_python_new_style_module.py @@ -21,6 +21,11 @@ input_json = sys.stdin.read() print "{" print " \"changed\": false," +# v2.5.1. apt.py started depending on this. +# https://github.com/dw/mitogen/issues/210 +print " \"__file__\": \"%s\"," % (__file__,) +# Python sets this during a regular import. +print " \"__package__\": \"%s\"," % (__package__,) print " \"msg\": \"Here is my input\"," print " \"source\": [%s]," % (json.dumps(me),) print " \"input\": [%s]" % (input_json,) diff --git a/tests/router_test.py b/tests/router_test.py index 09c54245..b32c5e4b 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -223,40 +223,50 @@ class NoRouteTest(testlib.RouterMixin, testlib.TestCase): # Verify sending a message to an invalid handle yields a dead message # from the target context. l1 = self.router.fork() + recv = l1.send_async(mitogen.core.Message(handle=999)) + msg = recv.get(throw_dead=False) + self.assertEquals(msg.is_dead, True) + self.assertEquals(msg.src_id, l1.context_id) + recv = l1.send_async(mitogen.core.Message(handle=999)) e = self.assertRaises(mitogen.core.ChannelError, - lambda: recv.get() - ) + lambda: recv.get()) self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg) def test_totally_invalid_context_returns_dead(self): recv = mitogen.core.Receiver(self.router) - self.router.route( - mitogen.core.Message( - dst_id=1234, - handle=1234, - reply_to=recv.handle, - ) + msg = mitogen.core.Message( + dst_id=1234, + handle=1234, + reply_to=recv.handle, ) + self.router.route(msg) + rmsg = recv.get(throw_dead=False) + self.assertEquals(rmsg.is_dead, True) + self.assertEquals(rmsg.src_id, mitogen.context_id) + + self.router.route(msg) e = self.assertRaises(mitogen.core.ChannelError, - lambda: recv.get() - ) + lambda: recv.get()) self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) def test_previously_alive_context_returns_dead(self): l1 = self.router.fork() l1.shutdown(wait=True) recv = mitogen.core.Receiver(self.router) - self.router.route( - mitogen.core.Message( - dst_id=l1.context_id, - handle=mitogen.core.CALL_FUNCTION, - reply_to=recv.handle, - ) + msg = mitogen.core.Message( + dst_id=l1.context_id, + handle=mitogen.core.CALL_FUNCTION, + reply_to=recv.handle, ) + self.router.route(msg) + rmsg = recv.get(throw_dead=False) + self.assertEquals(rmsg.is_dead, True) + self.assertEquals(rmsg.src_id, mitogen.context_id) + + self.router.route(msg) e = self.assertRaises(mitogen.core.ChannelError, - lambda: recv.get() - ) + lambda: recv.get()) self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg)