From 207f57537aff88d58187b754c169871ecef02d2f Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 03:32:46 +0100 Subject: [PATCH 1/9] issue #615: update Changelog. --- docs/changelog.rst | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 7f0ff234..5e527ea2 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -32,9 +32,6 @@ Enhancements `_ are not yet handled. -* `Operon `_ no longer requires a custom - installation, both Operon and Ansible are supported by a unified release. - * `#419 `_, `#470 `_, file descriptor usage during large runs is halved, as it is no longer necessary to manage read and @@ -54,6 +51,17 @@ Enhancements available to manipulate `Buildah `_ containers, and is exposed to Ansible as the ``buildah`` transport. +* `#615 `_: the ``mitogen_fetch`` + action is included, and the standard Ansible ``fetch`` action is redirected + to it. This implements streaming file transfer in every case, including when + ``become`` is active, preventing excessive CPU usage and memory spikes, and + significantly improving throughput. A copy of 2 files of 512 MiB each drops + from 47 seconds to just under 7 seconds, with peak memory usage dropping from + 10.7 GiB to 64.8 MiB. + +* `Operon `_ no longer requires a custom + installation, both Operon and Ansible are supported by a unified release. + * The ``MITOGEN_CPU_COUNT`` environment variable shards the connection multiplexer into per-CPU workers. This may improve throughput for runs involving large file transfers, and is required for future in-process SSH From 0e489625ed6eb22359248bf5cfd2b93c1b91015b Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 05:25:31 +0100 Subject: [PATCH 2/9] issue #615: regression test --- tests/ansible/regression/all.yml | 1 + .../issue_615__streaming_transfer.yml | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 tests/ansible/regression/issue_615__streaming_transfer.yml diff --git a/tests/ansible/regression/all.yml b/tests/ansible/regression/all.yml index f75a050c..81780bb3 100644 --- a/tests/ansible/regression/all.yml +++ b/tests/ansible/regression/all.yml @@ -11,3 +11,4 @@ - include: issue_558_unarchive_failed.yml - include: issue_590__sys_modules_crap.yml - include: issue_591__setuptools_cwd_crash.yml +- include: issue_615__streaming_transfer.yml diff --git a/tests/ansible/regression/issue_615__streaming_transfer.yml b/tests/ansible/regression/issue_615__streaming_transfer.yml new file mode 100644 index 00000000..aa7c62c4 --- /dev/null +++ b/tests/ansible/regression/issue_615__streaming_transfer.yml @@ -0,0 +1,21 @@ +# issue #615: 'fetch' with become: was internally using slurp. + +- hosts: target + any_errors_fatal: True + gather_facts: no + become: true + vars: + mitogen_ssh_compression: false + tasks: + - shell: | + dd if=/dev/zero of=/tmp/512mb.zero bs=1048576 count=512; + chmod go= /tmp/512mb.zero + + - fetch: + src: /tmp/512mb.zero + dest: /tmp/fetch-out + + - file: + path: /tmp/fetch-out + state: absent + delegate_to: localhost From 8a870f140255c67fdc42dad5b6a70ea1cc45ece0 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 11:51:25 +0100 Subject: [PATCH 3/9] issue #625: use exec() instead of subprocess in mitogen_ansible_playbook This is just to make CTRL+C handling less confusing. Alternate would be ignoring SIGINT, but this is simpler. --- tests/ansible/mitogen_ansible_playbook.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/ansible/mitogen_ansible_playbook.py b/tests/ansible/mitogen_ansible_playbook.py index 3af1791c..54fd4283 100755 --- a/tests/ansible/mitogen_ansible_playbook.py +++ b/tests/ansible/mitogen_ansible_playbook.py @@ -3,4 +3,8 @@ import os import subprocess import sys os.environ['ANSIBLE_STRATEGY'] = 'mitogen_linear' -subprocess.check_call(['./run_ansible_playbook.py'] + sys.argv[1:]) +os.execlp( + './run_ansible_playbook.py', + './run_ansible_playbook.py', + *sys.argv[1:] +) From e02be898799754d6ac4b9ca85db8e7f043d235f9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 11:56:31 +0100 Subject: [PATCH 4/9] issue #625: ignore SIGINT within MuxProcess Without this, MuxProcess will start dying too early, before Ansible / TaskQueueManager.cleanup() has a chance to wait on worker processes. That would allow WorkerProcess to see ECONNREFUSED from the MuxProcess socket much more easily. --- ansible_mitogen/process.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 10d55fdf..1fc7bf80 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -33,6 +33,7 @@ import multiprocessing import os import resource import socket +import signal import sys try: @@ -659,6 +660,12 @@ class MuxProcess(object): connected to the parent to be closed (indicating the parent has died). """ save_pid('mux') + + # #623: MuxProcess ignores SIGINT because it wants to live until every + # Ansible worker process has been cleaned up by + # TaskQueueManager.cleanup(), otherwise harmles yet scary warnings + # about being unable connect to MuxProess could be printed. + signal.signal(signal.SIGINT, signal.SIG_IGN) ansible_mitogen.logging.set_process_name('mux') ansible_mitogen.affinity.policy.assign_muxprocess(self.index) From f4cf67f0bdc9bfe2324f76696678965bf0ea7d75 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 12:01:30 +0100 Subject: [PATCH 5/9] issue #615: remove meaningless test It has been dead code since at least 2015 --- ansible_mitogen/plugins/action/mitogen_fetch.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/ansible_mitogen/plugins/action/mitogen_fetch.py b/ansible_mitogen/plugins/action/mitogen_fetch.py index ffa737e5..1844efd8 100644 --- a/ansible_mitogen/plugins/action/mitogen_fetch.py +++ b/ansible_mitogen/plugins/action/mitogen_fetch.py @@ -45,30 +45,25 @@ class ActionModule(ActionBase): task_vars = dict() result = super(ActionModule, self).run(tmp, task_vars) - del tmp # tmp no longer has any effect - try: if self._play_context.check_mode: result['skipped'] = True result['msg'] = 'check mode not (yet) supported for this module' return result - source = self._task.args.get('src', None) - dest = self._task.args.get('dest', None) flat = boolean(self._task.args.get('flat'), strict=False) fail_on_missing = boolean(self._task.args.get('fail_on_missing', True), strict=False) validate_checksum = boolean(self._task.args.get('validate_checksum', True), strict=False) # validate source and dest are strings FIXME: use basic.py and module specs + source = self._task.args.get('src') if not isinstance(source, string_types): result['msg'] = "Invalid type supplied for source option, it must be a string" + dest = self._task.args.get('dest') if not isinstance(dest, string_types): result['msg'] = "Invalid type supplied for dest option, it must be a string" - if source is None or dest is None: - result['msg'] = "src and dest are required" - if result.get('msg'): result['failed'] = True return result From 8f99ebdf6ff3f776298d1de8c532d2b249ab654e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 14:34:25 +0100 Subject: [PATCH 6/9] tests: hide memory-mapped files from lsof output Seems to be no saner way to do this. --- tests/testlib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testlib.py b/tests/testlib.py index 73d3438d..b702fa05 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -338,7 +338,7 @@ class TestCase(unittest2.TestCase): def _teardown_check_fds(self): mitogen.core.Latch._on_fork() if get_fd_count() != self._fd_count_before: - import os; os.system('lsof +E -w -p %s' % (os.getpid(),)) + import os; os.system('lsof +E -w -p %s | grep -vw mem' % (os.getpid(),)) assert 0, "%s leaked FDs. Count before: %s, after: %s" % ( self, self._fd_count_before, get_fd_count(), ) From 11923431a6e39bbb44b971970a33ce3288217cb6 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 14:35:03 +0100 Subject: [PATCH 7/9] tests: one case from doas_test was invoking su --- tests/doas_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/doas_test.py b/tests/doas_test.py index 560ada99..73758476 100644 --- a/tests/doas_test.py +++ b/tests/doas_test.py @@ -57,7 +57,7 @@ class DoasTest(testlib.DockerMixin, testlib.TestCase): username='mitogen__has_sudo', password='has_sudo_password', ) - context = self.router.su(via=ssh, password='rootpassword') + context = self.router.doas(via=ssh, password='has_sudo_password') self.assertEquals(0, context.call(os.getuid)) From 3d72cf82e3141da781600984d7f1cc6ac2263af9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 14:35:31 +0100 Subject: [PATCH 8/9] tests: use defer_sync() Rather than defer() + ancient sync_with_broker() --- tests/router_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/router_test.py b/tests/router_test.py index ef3fc4d5..fb4da501 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -62,12 +62,12 @@ class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase): recv = mitogen.core.Receiver(self.router) self.child2_msg.handle = recv.handle - self.broker.defer(self.router._async_route, - self.child2_msg, - in_stream=self.child1_stream) - - # Wait for IO loop to finish everything above. - self.sync_with_broker() + self.broker.defer_sync( + lambda: self.router._async_route( + self.child2_msg, + in_stream=self.child1_stream + ) + ) # Ensure message wasn't forwarded. self.assertTrue(recv.empty()) From bcca47df3c8e0fda7734dfbc7f6909d3eb049b74 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 17 Aug 2019 14:48:08 +0100 Subject: [PATCH 9/9] issue #533: update routing to account for DEL_ROUTE propagation race --- docs/changelog.rst | 12 +++++++++++ mitogen/core.py | 48 ++++++++++++++++++++++++++++---------------- tests/router_test.py | 28 ++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 17 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 5e527ea2..998821e2 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -198,6 +198,18 @@ Core Library `closed` flag, preventing historical bugs where a double close could destroy descriptors belonging to unrelated streams. +* `#533 `_: routing accounts for + a race between a parent sending a message to a child via an intermediary, + where the child had recently disconnected, and ``DEL_ROUTE`` propagating from + the intermediary to the parent, informing it that the child no longer exists. + This condition is detected at the intermediary and a dead message is returned + to the parent. + + Previously since the intermediary had already removed its route for the + child, the *route messages upwards* rule would be triggered, causing the + message (with a privileged ``src_id``/``auth_id``) to be sent upstream, + resulting in a ``bad auth_id`` log message and a hang. + * `#586 `_: fix import of :mod:`__main__` on later versions of Python 3 when running from the interactive console. diff --git a/mitogen/core.py b/mitogen/core.py index f9099e9a..30c0e948 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -3272,34 +3272,48 @@ class Router(object): )) return - # Perform source verification. + parent_stream = self._stream_by_id.get(mitogen.parent_id) + src_stream = self._stream_by_id.get(msg.src_id, parent_stream) + + # When the ingress stream is known, verify the message was received on + # the same as the stream we would expect to receive messages from the + # src_id and auth_id. This is like Reverse Path Filtering in IP, and + # ensures messages from a privileged context cannot be spoofed by a + # child. if in_stream: - parent = self._stream_by_id.get(mitogen.parent_id) - expect = self._stream_by_id.get(msg.auth_id, parent) - if in_stream != expect: + auth_stream = self._stream_by_id.get(msg.auth_id, parent_stream) + if in_stream != auth_stream: LOG.error('%r: bad auth_id: got %r via %r, not %r: %r', - self, msg.auth_id, in_stream, expect, msg) + self, msg.auth_id, in_stream, auth_stream, msg) return - if msg.src_id != msg.auth_id: - expect = self._stream_by_id.get(msg.src_id, parent) - if in_stream != expect: - LOG.error('%r: bad src_id: got %r via %r, not %r: %r', - self, msg.src_id, in_stream, expect, msg) - return + if msg.src_id != msg.auth_id and in_stream != src_stream: + LOG.error('%r: bad src_id: got %r via %r, not %r: %r', + self, msg.src_id, in_stream, src_stream, msg) + return + # If the stream's MitogenProtocol has auth_id set, copy it to the + # message. This allows subtrees to become privileged by stamping a + # parent's context ID. It is used by mitogen.unix to mark client + # streams (like Ansible WorkerProcess) as having the same rights as + # the parent. if in_stream.protocol.auth_id is not None: msg.auth_id = in_stream.protocol.auth_id - # Maintain a set of IDs the source ever communicated with. + # Record the IDs the source ever communicated with. in_stream.protocol.egress_ids.add(msg.dst_id) if msg.dst_id == mitogen.context_id: return self._invoke(msg, in_stream) out_stream = self._stream_by_id.get(msg.dst_id) - if out_stream is None: - out_stream = self._stream_by_id.get(mitogen.parent_id) + if (not out_stream) and (parent_stream != src_stream or not in_stream): + # No downstream route exists. The message could be from a child or + # ourselves for a parent, in which case we must forward it + # upstream, or it could be from a parent for a dead child, in which + # case its src_id/auth_id would fail verification if returned to + # the parent, so in that case reply with a dead message instead. + out_stream = parent_stream if out_stream is None: self._maybe_send_dead(True, msg, self.no_route_msg, @@ -3310,9 +3324,9 @@ class Router(object): (in_stream.protocol.is_privileged or out_stream.protocol.is_privileged): self._maybe_send_dead(True, msg, self.unidirectional_msg, - in_stream.protocol.remote_id, - out_stream.protocol.remote_id, - mitogen.context_id) + in_stream.protocol.remote_id, + out_stream.protocol.remote_id, + mitogen.context_id) return out_stream.protocol._send(msg) diff --git a/tests/router_test.py b/tests/router_test.py index fb4da501..58ab637a 100644 --- a/tests/router_test.py +++ b/tests/router_test.py @@ -76,6 +76,34 @@ class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase): expect = 'bad auth_id: got %r via' % (self.child2_msg.auth_id,) self.assertTrue(expect in log.stop()) + def test_parent_unaware_of_disconnect(self): + # Parent -> Child A -> Child B. B disconnects concurrent to Parent + # sending message. Parent does not yet know B has disconnected, A + # receives message from Parent with Parent's auth_id, for a stream that + # no longer exists. + c1 = self.router.local() + strm = self.router.stream_by_id(c1.context_id) + recv = mitogen.core.Receiver(self.router) + + self.broker.defer(lambda: + strm.protocol._send( + mitogen.core.Message( + dst_id=1234, # nonexistent child + handle=1234, + src_id=mitogen.context_id, + reply_to=recv.handle, + ) + ) + ) + + e = self.assertRaises(mitogen.core.ChannelError, + lambda: recv.get().unpickle() + ) + self.assertEquals(e.args[0], self.router.no_route_msg % ( + 1234, + c1.context_id, + )) + def test_bad_src_id(self): # Deliver a message locally from child2 with the correct auth_id, but # the wrong src_id.