From 6541779dd626eebe0b848ac396dcc3a5a3d32e5e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 03:21:59 +0100 Subject: [PATCH 1/8] tests: import Ansible file transfer benchmark --- tests/ansible/bench/all.yml | 1 + tests/ansible/bench/file_transfer.yml | 68 +++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 tests/ansible/bench/all.yml create mode 100644 tests/ansible/bench/file_transfer.yml diff --git a/tests/ansible/bench/all.yml b/tests/ansible/bench/all.yml new file mode 100644 index 00000000..453d0b42 --- /dev/null +++ b/tests/ansible/bench/all.yml @@ -0,0 +1 @@ +- import_playbook: file_transfer.yml diff --git a/tests/ansible/bench/file_transfer.yml b/tests/ansible/bench/file_transfer.yml new file mode 100644 index 00000000..99666b84 --- /dev/null +++ b/tests/ansible/bench/file_transfer.yml @@ -0,0 +1,68 @@ + +- name: bench/file_transfer.yml + hosts: all + any_errors_fatal: true + tasks: + + - name: Make 32MiB file + connection: local + shell: openssl rand 33554432 > /tmp/bigfile.in + + - name: Make 320MiB file + connection: local + shell: > + cat + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + /tmp/bigfile.in + > /tmp/bigbigfile.in + + - name: Delete SSH file is present. + file: + path: "{{item}}" + state: absent + become: true + with_items: + - /tmp/bigfile.out + - /tmp/bigbigfile.out + + - name: Copy 32MiB file via SSH + copy: + src: /tmp/bigfile.in + dest: /tmp/bigfile.out + + - name: Copy 320MiB file via SSH + copy: + src: /tmp/bigbigfile.in + dest: /tmp/bigbigfile.out + + - name: Delete localhost sudo file if present. + file: + path: "{{item}}" + state: absent + connection: local + become: true + with_items: + - /tmp/bigfile.out + - /tmp/bigbigfile.out + + - name: Copy 32MiB file via localhost sudo + connection: local + become: true + copy: + src: /tmp/bigfile.in + dest: /tmp/bigfile.out + + - name: Copy 320MiB file via localhost sudo + connection: local + become: true + copy: + src: /tmp/bigbigfile.in + dest: /tmp/bigbigfile.out From cf30e88a3e6157a5170a5d9306addcc5f91bc491 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 03:35:35 +0100 Subject: [PATCH 2/8] ansible: implement missing FileService.on_shutdown() --- ansible_mitogen/services.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index de7f8135..e9f40130 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -381,6 +381,14 @@ class FileService(mitogen.service.Service): self._thread = threading.Thread(target=self._scheduler_main) self._thread.start() + def on_shutdown(self): + """ + Respond to shutdown of the service pool by marking our queue closed. + This causes :meth:`_sleep_on_queue` to wake immediately and return + :data:`False`, causing the scheduler thread main function to exit. + """ + self._queue.close() + def _pending_bytes(self, stream): """ Defer a function call to the Broker thread in order to accurately From 376fc850000f314705a05d14ee5ea84f01326988 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 03:58:29 +0100 Subject: [PATCH 3/8] ansible: FileService docstrings. --- ansible_mitogen/services.py | 82 ++++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 14 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index e9f40130..9d94ed18 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -347,27 +347,81 @@ class ContextService(mitogen.service.Service): class FileService(mitogen.service.Service): """ - Primitive latency-inducing file server for old-style incantations of the - module runner. This is to be replaced later with a scheme that forwards - files known to be missing without the target having to ask for them, - avoiding a corresponding roundtrip per file. - - Paths must be explicitly added to the service by a trusted context before - they will be served to an untrusted context. + Streaming file server, used to serve both small files like Ansible module + sources, and huge files like ISO images. Paths must be explicitly added to + the service by a trusted context before they will be served to an untrusted + context. + + The file service nominally lives on the mitogen.service.Pool() threads + shared with ContextService above, however for simplicity it also maintains + a dedicated thread from where file chunks are scheduled. + + The scheduler thread is responsible for dividing transfer requests up among + the physical streams that connect to those contexts, and ensure each stream + never has an excessive amount of data buffered in RAM at any time. + + Transfers proceeed one-at-a-time per stream. When multiple contexts exist + reachable over the same stream (e.g. one is the SSH account, another is a + sudo account, and a third is a proxied SSH connection), each request is + satisfied in turn before chunks for subsuquent requests start flowing. This + ensures when a connection is contended, that preference is given to + completing individual transfers, rather than potentially aborting many + partially complete transfers, causing all the bandwidth used to be wasted. + + Theory of operation: + 1. Trusted context (i.e. a WorkerProcess) calls register(), making a + file available to any untrusted context. + 2. Untrusted context creates a mitogen.core.Receiver() to receive + file chunks. It then calls fetch(path, recv.to_sender()), which sets + up the transfer. The fetch() method returns the final file size and + notifies the dedicated thread of the transfer request. + 3. The dedicated thread wakes from perpetual sleep, looks up the stream + used to communicate with the untrusted context, and begins pumping + 128KiB-sized chunks until that stream's output queue reaches a + limit (1MiB). + 4. The thread sleeps for 10ms, wakes, and pumps new chunks as necesarry + to refill any drained output queue, which are being asynchronously + drained by the Stream implementation running on the Broker thread. + 5. Once the last chunk has been pumped for a single transfer, + Sender.close() is called causing the receive loop in + target.py::_get_file() to exit, and allows that code to compare the + transferred size with the total file size indicated by the return + value of the fetch() method. + 6. If the sizes mismatch, the caller is informed, which will discard + the result and log an error. + 7. Once all chunks have been pumped for all transfers, the dedicated + thread stops waking at 10ms intervals and resumes perpetual sleep. + + Shutdown: + 1. process.py calls service.Pool.shutdown(), which arranges for all the + service pool threads to exit and be joined, guranteeing no new + requests can arrive, before calling Service.on_shutdown() for each + registered service. + 2. FileService.on_shutdown() marks the dedicated thread's queue as + closed, causing the dedicated thread to wake immediately. It will + throw an exception that begins shutdown of the main loop. + 3. The main loop calls Sender.close() prematurely for every pending + transfer, causing any Receiver loops in the target contexts to exit + early. The file size check fails, and the partially downloaded file + is discarded, and an error is logged. + 4. Control exits the file transfer function in every target, and + graceful target shutdown can proceed normally, without the + associated thread needing to be forcefully killed. """ handle = 501 max_message_size = 1000 unregistered_msg = 'Path is not registered with FileService.' #: Maximum size of any stream's output queue before we temporarily stop - #: pumping more file chunks. The queue may overspill by up to - #: mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). + #: pumping more file chunks on that stream. The queue may overspill by up + #: to mitogen.core.CHUNK_SIZE-1 bytes (128KiB-1). max_queue_size = 1048576 - #: Time spent by the scheduler thread asleep when it has no more queues to - #: pump. With max_queue_size=1MiB and a sleep of 10ms, maximum throughput - #: on any single stream is 100MiB/sec, which is 5x what SSH can handle on - #: my laptop. + #: Time spent by the scheduler thread asleep when it has no more data to + #: pump, but while at least one transfer remains active. With + #: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any + #: single stream is 100MiB/sec, which is 5x what SSH can handle on my + #: laptop. sleep_delay_ms = 0.01 def __init__(self, router): @@ -385,7 +439,7 @@ class FileService(mitogen.service.Service): """ Respond to shutdown of the service pool by marking our queue closed. This causes :meth:`_sleep_on_queue` to wake immediately and return - :data:`False`, causing the scheduler thread main function to exit. + :data:`False`, causing the scheduler main thread to exit. """ self._queue.close() From 21082cec40f344a62e9fcd75b1e7f5c156d7b71b Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 04:04:58 +0100 Subject: [PATCH 4/8] ansible: fix ugly formatting. --- ansible_mitogen/services.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 9d94ed18..9ed95b4d 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -575,8 +575,6 @@ class FileService(mitogen.service.Service): raise mitogen.core.CallError(self.unregistered_msg) LOG.debug('Serving %r', path) - self._queue.put(( - sender, - open(path, 'rb', mitogen.core.CHUNK_SIZE), - )) + fp = open(path, 'rb', mitogen.core.CHUNK_SIZE) + self._queue.put((sender, fp)) return self._size_by_path[path] From 89fc842ca8d7284682948ed36cf349bea532cb3f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 04:12:02 +0100 Subject: [PATCH 5/8] ansible: typo. --- ansible_mitogen/services.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 9ed95b4d..843f3013 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -363,7 +363,7 @@ class FileService(mitogen.service.Service): Transfers proceeed one-at-a-time per stream. When multiple contexts exist reachable over the same stream (e.g. one is the SSH account, another is a sudo account, and a third is a proxied SSH connection), each request is - satisfied in turn before chunks for subsuquent requests start flowing. This + satisfied in turn before chunks for subsequent requests start flowing. This ensures when a connection is contended, that preference is given to completing individual transfers, rather than potentially aborting many partially complete transfers, causing all the bandwidth used to be wasted. @@ -379,7 +379,7 @@ class FileService(mitogen.service.Service): used to communicate with the untrusted context, and begins pumping 128KiB-sized chunks until that stream's output queue reaches a limit (1MiB). - 4. The thread sleeps for 10ms, wakes, and pumps new chunks as necesarry + 4. The thread sleeps for 10ms, wakes, and pumps new chunks as necessary to refill any drained output queue, which are being asynchronously drained by the Stream implementation running on the Broker thread. 5. Once the last chunk has been pumped for a single transfer, @@ -500,11 +500,12 @@ class FileService(mitogen.service.Service): :meth:`on_shutdown` hasn't been called yet, otherwise :data:`False`. """ + if self._schedule_pending: + timeout = self.sleep_delay_ms + else: + timeout = None + try: - if self._schedule_pending: - timeout = self.sleep_delay_ms - else: - timeout = None sender, fp = self._queue.get(timeout=timeout) except mitogen.core.LatchError: return False From e63ae4768e3cf03c8ef0dfe46f117d8346c380e1 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 05:08:37 +0100 Subject: [PATCH 6/8] core: support Receiver.get(thread_dead=False) For tests. --- mitogen/core.py | 4 ++-- tests/router_test.py | 46 +++++++++++++++++++++++++++----------------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index c57f84cb..4a48dac0 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -441,10 +441,10 @@ class Receiver(object): def empty(self): return self._latch.empty() - def get(self, timeout=None, block=True): + def get(self, timeout=None, block=True, throw_dead=True): _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) msg = self._latch.get(timeout=timeout, block=block) - if msg.is_dead: + if msg.is_dead and throw_dead: if msg.src_id == mitogen.context_id: raise ChannelError(ChannelError.local_msg) else: diff --git a/tests/router_test.py b/tests/router_test.py index 09c54245..b32c5e4b 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -223,40 +223,50 @@ class NoRouteTest(testlib.RouterMixin, testlib.TestCase): # Verify sending a message to an invalid handle yields a dead message # from the target context. l1 = self.router.fork() + recv = l1.send_async(mitogen.core.Message(handle=999)) + msg = recv.get(throw_dead=False) + self.assertEquals(msg.is_dead, True) + self.assertEquals(msg.src_id, l1.context_id) + recv = l1.send_async(mitogen.core.Message(handle=999)) e = self.assertRaises(mitogen.core.ChannelError, - lambda: recv.get() - ) + lambda: recv.get()) self.assertEquals(e.args[0], mitogen.core.ChannelError.remote_msg) def test_totally_invalid_context_returns_dead(self): recv = mitogen.core.Receiver(self.router) - self.router.route( - mitogen.core.Message( - dst_id=1234, - handle=1234, - reply_to=recv.handle, - ) + msg = mitogen.core.Message( + dst_id=1234, + handle=1234, + reply_to=recv.handle, ) + self.router.route(msg) + rmsg = recv.get(throw_dead=False) + self.assertEquals(rmsg.is_dead, True) + self.assertEquals(rmsg.src_id, mitogen.context_id) + + self.router.route(msg) e = self.assertRaises(mitogen.core.ChannelError, - lambda: recv.get() - ) + lambda: recv.get()) self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) def test_previously_alive_context_returns_dead(self): l1 = self.router.fork() l1.shutdown(wait=True) recv = mitogen.core.Receiver(self.router) - self.router.route( - mitogen.core.Message( - dst_id=l1.context_id, - handle=mitogen.core.CALL_FUNCTION, - reply_to=recv.handle, - ) + msg = mitogen.core.Message( + dst_id=l1.context_id, + handle=mitogen.core.CALL_FUNCTION, + reply_to=recv.handle, ) + self.router.route(msg) + rmsg = recv.get(throw_dead=False) + self.assertEquals(rmsg.is_dead, True) + self.assertEquals(rmsg.src_id, mitogen.context_id) + + self.router.route(msg) e = self.assertRaises(mitogen.core.ChannelError, - lambda: recv.get() - ) + lambda: recv.get()) self.assertEquals(e.args[0], mitogen.core.ChannelError.local_msg) From b2abe74ab61bf1140d8902b9e819634079e73069 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 13:21:58 +0100 Subject: [PATCH 7/8] issue #210: run DebOps under v2.5.1 too. --- .travis.yml | 8 ++++---- .travis/debops_common_tests.sh | 3 ++- ansible_mitogen/services.py | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/.travis.yml b/.travis.yml index e25cf813..47c64a35 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,11 +12,11 @@ python: env: - MODE=mitogen MITOGEN_TEST_DISTRO=debian - MODE=mitogen MITOGEN_TEST_DISTRO=centos -- MODE=debops_common +- MODE=debops_common ANSIBLE_VERSION=2.4.3.0 +- MODE=debops_common ANSIBLE_VERSION=2.5.1 - MODE=ansible ANSIBLE_VERSION=2.4.3.0 MITOGEN_TEST_DISTRO=debian -- MODE=ansible ANSIBLE_VERSION=2.4.3.0 MITOGEN_TEST_DISTRO=centos -- MODE=ansible ANSIBLE_VERSION=2.5.0 MITOGEN_TEST_DISTRO=centos -- MODE=ansible ANSIBLE_VERSION=2.5.0 MITOGEN_TEST_DISTRO=debian +- MODE=ansible ANSIBLE_VERSION=2.5.1 MITOGEN_TEST_DISTRO=centos +- MODE=ansible ANSIBLE_VERSION=2.5.1 MITOGEN_TEST_DISTRO=debian install: - pip install -r dev_requirements.txt diff --git a/.travis/debops_common_tests.sh b/.travis/debops_common_tests.sh index b75484c2..f0537a85 100755 --- a/.travis/debops_common_tests.sh +++ b/.travis/debops_common_tests.sh @@ -4,6 +4,7 @@ TMPDIR="/tmp/debops-$$" TRAVIS_BUILD_DIR="${TRAVIS_BUILD_DIR:-`pwd`}" TARGET_COUNT="${TARGET_COUNT:-2}" +ANSIBLE_VERSION="${ANSIBLE_VERSION:-2.4.3.0}" MITOGEN_TEST_DISTRO=debian # Naturally DebOps only supports Debian. export PYTHONPATH="${PYTHONPATH}:${TRAVIS_BUILD_DIR}" @@ -26,7 +27,7 @@ mkdir "$TMPDIR" echo travis_fold:start:job_setup -pip install -qqqU debops==0.7.2 ansible==2.4.3.0 +pip install -qqqU debops==0.7.2 ansible==${ANSIBLE_VERSION} debops-init "$TMPDIR/project" cd "$TMPDIR/project" diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 843f3013..fee50934 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -448,7 +448,7 @@ class FileService(mitogen.service.Service): Defer a function call to the Broker thread in order to accurately measure the bytes pending in `stream`'s queue. - This must be done synchronized with the Broker, as scheduler + This must be done synchronized with the Broker, as OS scheduler uncertainty could cause Sender.send()'s deferred enqueues to be processed very late, making the output queue look much emptier than it really is (or is about to become). From 86c9978e09650982b04189320a622420880f51ce Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 22 Apr 2018 13:27:41 +0100 Subject: [PATCH 8/8] Ensure mod.__file__ is set; closes #210. --- ansible_mitogen/runner.py | 2 ++ tests/ansible/lib/modules/custom_python_new_style_module.py | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index af3e2bf7..5b58116b 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -386,6 +386,8 @@ class NewStyleRunner(ScriptRunner): def _run(self): code = self._get_code() mod = types.ModuleType('__main__') + mod.__file__ = self.program_fp.name + mod.__package__ = None d = vars(mod) e = None diff --git a/tests/ansible/lib/modules/custom_python_new_style_module.py b/tests/ansible/lib/modules/custom_python_new_style_module.py index 1ae50d50..4bc9794d 100755 --- a/tests/ansible/lib/modules/custom_python_new_style_module.py +++ b/tests/ansible/lib/modules/custom_python_new_style_module.py @@ -21,6 +21,11 @@ input_json = sys.stdin.read() print "{" print " \"changed\": false," +# v2.5.1. apt.py started depending on this. +# https://github.com/dw/mitogen/issues/210 +print " \"__file__\": \"%s\"," % (__file__,) +# Python sets this during a regular import. +print " \"__package__\": \"%s\"," % (__package__,) print " \"msg\": \"Here is my input\"," print " \"source\": [%s]," % (json.dumps(me),) print " \"input\": [%s]" % (input_json,)