Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #533: update routing to account for DEL_ROUTE propagation race
  tests: use defer_sync() Rather than defer() + ancient sync_with_broker()
  tests: one case from doas_test was invoking su
  tests: hide memory-mapped files from lsof output
  issue #615: remove meaningless test
  issue #625: ignore SIGINT within MuxProcess
  issue #625: use exec() instead of subprocess in mitogen_ansible_playbook
  issue #615: regression test
  issue #615: update Changelog.
new-serialization
David Wilson 5 years ago
commit 8d16f657ab

@ -45,30 +45,25 @@ class ActionModule(ActionBase):
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
try:
if self._play_context.check_mode:
result['skipped'] = True
result['msg'] = 'check mode not (yet) supported for this module'
return result
source = self._task.args.get('src', None)
dest = self._task.args.get('dest', None)
flat = boolean(self._task.args.get('flat'), strict=False)
fail_on_missing = boolean(self._task.args.get('fail_on_missing', True), strict=False)
validate_checksum = boolean(self._task.args.get('validate_checksum', True), strict=False)
# validate source and dest are strings FIXME: use basic.py and module specs
source = self._task.args.get('src')
if not isinstance(source, string_types):
result['msg'] = "Invalid type supplied for source option, it must be a string"
dest = self._task.args.get('dest')
if not isinstance(dest, string_types):
result['msg'] = "Invalid type supplied for dest option, it must be a string"
if source is None or dest is None:
result['msg'] = "src and dest are required"
if result.get('msg'):
result['failed'] = True
return result

@ -33,6 +33,7 @@ import multiprocessing
import os
import resource
import socket
import signal
import sys
try:
@ -659,6 +660,12 @@ class MuxProcess(object):
connected to the parent to be closed (indicating the parent has died).
"""
save_pid('mux')
# #623: MuxProcess ignores SIGINT because it wants to live until every
# Ansible worker process has been cleaned up by
# TaskQueueManager.cleanup(), otherwise harmles yet scary warnings
# about being unable connect to MuxProess could be printed.
signal.signal(signal.SIGINT, signal.SIG_IGN)
ansible_mitogen.logging.set_process_name('mux')
ansible_mitogen.affinity.policy.assign_muxprocess(self.index)

@ -32,9 +32,6 @@ Enhancements
<https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_
are not yet handled.
* `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom
installation, both Operon and Ansible are supported by a unified release.
* `#419 <https://github.com/dw/mitogen/issues/419>`_,
`#470 <https://github.com/dw/mitogen/issues/470>`_, file descriptor usage
during large runs is halved, as it is no longer necessary to manage read and
@ -54,6 +51,17 @@ Enhancements
available to manipulate `Buildah <https://buildah.io/>`_ containers, and is
exposed to Ansible as the ``buildah`` transport.
* `#615 <https://github.com/dw/mitogen/issues/615>`_: the ``mitogen_fetch``
action is included, and the standard Ansible ``fetch`` action is redirected
to it. This implements streaming file transfer in every case, including when
``become`` is active, preventing excessive CPU usage and memory spikes, and
significantly improving throughput. A copy of 2 files of 512 MiB each drops
from 47 seconds to just under 7 seconds, with peak memory usage dropping from
10.7 GiB to 64.8 MiB.
* `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom
installation, both Operon and Ansible are supported by a unified release.
* The ``MITOGEN_CPU_COUNT`` environment variable shards the connection
multiplexer into per-CPU workers. This may improve throughput for runs
involving large file transfers, and is required for future in-process SSH
@ -190,6 +198,18 @@ Core Library
`closed` flag, preventing historical bugs where a double close could destroy
descriptors belonging to unrelated streams.
* `#533 <https://github.com/dw/mitogen/issues/533>`_: routing accounts for
a race between a parent sending a message to a child via an intermediary,
where the child had recently disconnected, and ``DEL_ROUTE`` propagating from
the intermediary to the parent, informing it that the child no longer exists.
This condition is detected at the intermediary and a dead message is returned
to the parent.
Previously since the intermediary had already removed its route for the
child, the *route messages upwards* rule would be triggered, causing the
message (with a privileged ``src_id``/``auth_id``) to be sent upstream,
resulting in a ``bad auth_id`` log message and a hang.
* `#586 <https://github.com/dw/mitogen/issues/586>`_: fix import of
:mod:`__main__` on later versions of Python 3 when running from the
interactive console.

@ -3272,34 +3272,48 @@ class Router(object):
))
return
# Perform source verification.
parent_stream = self._stream_by_id.get(mitogen.parent_id)
src_stream = self._stream_by_id.get(msg.src_id, parent_stream)
# When the ingress stream is known, verify the message was received on
# the same as the stream we would expect to receive messages from the
# src_id and auth_id. This is like Reverse Path Filtering in IP, and
# ensures messages from a privileged context cannot be spoofed by a
# child.
if in_stream:
parent = self._stream_by_id.get(mitogen.parent_id)
expect = self._stream_by_id.get(msg.auth_id, parent)
if in_stream != expect:
auth_stream = self._stream_by_id.get(msg.auth_id, parent_stream)
if in_stream != auth_stream:
LOG.error('%r: bad auth_id: got %r via %r, not %r: %r',
self, msg.auth_id, in_stream, expect, msg)
self, msg.auth_id, in_stream, auth_stream, msg)
return
if msg.src_id != msg.auth_id:
expect = self._stream_by_id.get(msg.src_id, parent)
if in_stream != expect:
if msg.src_id != msg.auth_id and in_stream != src_stream:
LOG.error('%r: bad src_id: got %r via %r, not %r: %r',
self, msg.src_id, in_stream, expect, msg)
self, msg.src_id, in_stream, src_stream, msg)
return
# If the stream's MitogenProtocol has auth_id set, copy it to the
# message. This allows subtrees to become privileged by stamping a
# parent's context ID. It is used by mitogen.unix to mark client
# streams (like Ansible WorkerProcess) as having the same rights as
# the parent.
if in_stream.protocol.auth_id is not None:
msg.auth_id = in_stream.protocol.auth_id
# Maintain a set of IDs the source ever communicated with.
# Record the IDs the source ever communicated with.
in_stream.protocol.egress_ids.add(msg.dst_id)
if msg.dst_id == mitogen.context_id:
return self._invoke(msg, in_stream)
out_stream = self._stream_by_id.get(msg.dst_id)
if out_stream is None:
out_stream = self._stream_by_id.get(mitogen.parent_id)
if (not out_stream) and (parent_stream != src_stream or not in_stream):
# No downstream route exists. The message could be from a child or
# ourselves for a parent, in which case we must forward it
# upstream, or it could be from a parent for a dead child, in which
# case its src_id/auth_id would fail verification if returned to
# the parent, so in that case reply with a dead message instead.
out_stream = parent_stream
if out_stream is None:
self._maybe_send_dead(True, msg, self.no_route_msg,

@ -3,4 +3,8 @@ import os
import subprocess
import sys
os.environ['ANSIBLE_STRATEGY'] = 'mitogen_linear'
subprocess.check_call(['./run_ansible_playbook.py'] + sys.argv[1:])
os.execlp(
'./run_ansible_playbook.py',
'./run_ansible_playbook.py',
*sys.argv[1:]
)

@ -11,3 +11,4 @@
- include: issue_558_unarchive_failed.yml
- include: issue_590__sys_modules_crap.yml
- include: issue_591__setuptools_cwd_crash.yml
- include: issue_615__streaming_transfer.yml

@ -0,0 +1,21 @@
# issue #615: 'fetch' with become: was internally using slurp.
- hosts: target
any_errors_fatal: True
gather_facts: no
become: true
vars:
mitogen_ssh_compression: false
tasks:
- shell: |
dd if=/dev/zero of=/tmp/512mb.zero bs=1048576 count=512;
chmod go= /tmp/512mb.zero
- fetch:
src: /tmp/512mb.zero
dest: /tmp/fetch-out
- file:
path: /tmp/fetch-out
state: absent
delegate_to: localhost

@ -57,7 +57,7 @@ class DoasTest(testlib.DockerMixin, testlib.TestCase):
username='mitogen__has_sudo',
password='has_sudo_password',
)
context = self.router.su(via=ssh, password='rootpassword')
context = self.router.doas(via=ssh, password='has_sudo_password')
self.assertEquals(0, context.call(os.getuid))

@ -62,12 +62,12 @@ class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase):
recv = mitogen.core.Receiver(self.router)
self.child2_msg.handle = recv.handle
self.broker.defer(self.router._async_route,
self.broker.defer_sync(
lambda: self.router._async_route(
self.child2_msg,
in_stream=self.child1_stream)
# Wait for IO loop to finish everything above.
self.sync_with_broker()
in_stream=self.child1_stream
)
)
# Ensure message wasn't forwarded.
self.assertTrue(recv.empty())
@ -76,6 +76,34 @@ class SourceVerifyTest(testlib.RouterMixin, testlib.TestCase):
expect = 'bad auth_id: got %r via' % (self.child2_msg.auth_id,)
self.assertTrue(expect in log.stop())
def test_parent_unaware_of_disconnect(self):
# Parent -> Child A -> Child B. B disconnects concurrent to Parent
# sending message. Parent does not yet know B has disconnected, A
# receives message from Parent with Parent's auth_id, for a stream that
# no longer exists.
c1 = self.router.local()
strm = self.router.stream_by_id(c1.context_id)
recv = mitogen.core.Receiver(self.router)
self.broker.defer(lambda:
strm.protocol._send(
mitogen.core.Message(
dst_id=1234, # nonexistent child
handle=1234,
src_id=mitogen.context_id,
reply_to=recv.handle,
)
)
)
e = self.assertRaises(mitogen.core.ChannelError,
lambda: recv.get().unpickle()
)
self.assertEquals(e.args[0], self.router.no_route_msg % (
1234,
c1.context_id,
))
def test_bad_src_id(self):
# Deliver a message locally from child2 with the correct auth_id, but
# the wrong src_id.

@ -338,7 +338,7 @@ class TestCase(unittest2.TestCase):
def _teardown_check_fds(self):
mitogen.core.Latch._on_fork()
if get_fd_count() != self._fd_count_before:
import os; os.system('lsof +E -w -p %s' % (os.getpid(),))
import os; os.system('lsof +E -w -p %s | grep -vw mem' % (os.getpid(),))
assert 0, "%s leaked FDs. Count before: %s, after: %s" % (
self, self._fd_count_before, get_fd_count(),
)

Loading…
Cancel
Save