From c413d5314464339511b41a978dfb32661fdd0a17 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 17:50:28 +0000 Subject: [PATCH 01/10] os_fork: python 3 fixes and tests. --- mitogen/os_fork.py | 2 +- tests/os_fork_test.py | 52 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) create mode 100644 tests/os_fork_test.py diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index e6a3aafc..386a061f 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -135,7 +135,7 @@ class Corker(object): held. This will not return until each thread acknowledges it has ceased execution. """ - s = 'CORK' * ((128 / 4) * 1024) + s = mitogen.core.b('CORK') * ((128 // 4) * 1024) self._rsocks = [] for pool in self.pools: if not pool.closed: diff --git a/tests/os_fork_test.py b/tests/os_fork_test.py new file mode 100644 index 00000000..14ea8465 --- /dev/null +++ b/tests/os_fork_test.py @@ -0,0 +1,52 @@ + +import testlib +import unittest2 + +import mitogen.os_fork +import mitogen.service + + +class CorkTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.os_fork.Corker + + def ping(self, latch): + latch.put('pong') + + def test_cork_broker(self): + latch = mitogen.core.Latch() + self.broker.defer(self.ping, latch) + self.assertEquals('pong', latch.get()) + + corker = self.klass(brokers=(self.broker,)) + corker.cork() + + latch = mitogen.core.Latch() + self.broker.defer(self.ping, latch) + self.assertRaises(mitogen.core.TimeoutError, + lambda: latch.get(timeout=0.5)) + corker.uncork() + self.assertEquals('pong', latch.get()) + + def test_cork_pool(self): + pool = mitogen.service.Pool(self.router, services=(), size=4) + try: + latch = mitogen.core.Latch() + pool.defer(self.ping, latch) + self.assertEquals('pong', latch.get()) + + corker = self.klass(pools=(pool,)) + corker.cork() + + latch = mitogen.core.Latch() + pool.defer(self.ping, latch) + self.assertRaises(mitogen.core.TimeoutError, + lambda: latch.get(timeout=0.5)) + corker.uncork() + self.assertEquals('pong', latch.get()) + finally: + pool.stop(join=True) + + + +if __name__ == '__main__': + unittest2.main() From ad7185444dd7787e766b634ee8186a463bce7b02 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:00:02 +0000 Subject: [PATCH 02/10] .ci: allow containers for different jobs to run simultaneously --- .ci/ci_lib.py | 8 +++++--- .ci/debops_common_tests.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.ci/ci_lib.py b/.ci/ci_lib.py index d4f32f55..bc50d6f8 100644 --- a/.ci/ci_lib.py +++ b/.ci/ci_lib.py @@ -165,7 +165,7 @@ def image_for_distro(distro): return 'mitogen/%s-test' % (distro.partition('-')[0],) -def make_containers(): +def make_containers(name_prefix='', port_offset=0): docker_hostname = get_docker_hostname() firstbit = lambda s: (s+'-').split('-')[0] secondbit = lambda s: (s+'-').split('-')[1] @@ -183,9 +183,9 @@ def make_containers(): for x in range(count): lst.append({ "distro": firstbit(distro), - "name": "target-%s-%s" % (distro, i), + "name": name_prefix + ("target-%s-%s" % (distro, i)), "hostname": docker_hostname, - "port": BASE_PORT + i, + "port": BASE_PORT + i + port_offset, "python_path": ( '/usr/bin/python3' if secondbit(distro) == 'py3' @@ -207,6 +207,8 @@ def start_containers(containers): "docker run " "--rm " "--detach " + "--privileged " + "--cap-add=SYS_PTRACE " "--publish 0.0.0.0:%(port)s:22/tcp " "--hostname=%(name)s " "--name=%(name)s " diff --git a/.ci/debops_common_tests.py b/.ci/debops_common_tests.py index 8e9f2953..6553ccea 100755 --- a/.ci/debops_common_tests.py +++ b/.ci/debops_common_tests.py @@ -20,7 +20,7 @@ docker_hostname = ci_lib.get_docker_hostname() with ci_lib.Fold('docker_setup'): - containers = ci_lib.make_containers() + containers = ci_lib.make_containers(port_offset=500, name_prefix='debops-') ci_lib.start_containers(containers) From 021cfda11246015b2be47e759a6618cad5f18e63 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:00:23 +0000 Subject: [PATCH 03/10] .ci: import soak scripts. --- .ci/soak/debops_common.sh | 16 ++++++++++++++++ .ci/soak/mitogen.sh | 17 +++++++++++++++++ .ci/soak/mitogen_py24.sh | 12 ++++++++++++ 3 files changed, 45 insertions(+) create mode 100755 .ci/soak/debops_common.sh create mode 100755 .ci/soak/mitogen.sh create mode 100755 .ci/soak/mitogen_py24.sh diff --git a/.ci/soak/debops_common.sh b/.ci/soak/debops_common.sh new file mode 100755 index 00000000..eefb4917 --- /dev/null +++ b/.ci/soak/debops_common.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +export NOCOVERAGE=1 + +# Make Docker containers once. +/usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break +export KEEP=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break +done + +echo $i diff --git a/.ci/soak/mitogen.sh b/.ci/soak/mitogen.sh new file mode 100755 index 00000000..15d62529 --- /dev/null +++ b/.ci/soak/mitogen.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +export NOCOVERAGE=1 +export DISTROS="debian*4" + +# Make Docker containers once. +/usr/bin/time -v ./.ci/ansible_tests.py "$@" +export KEEP=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/ansible_tests.py "$@" || break +done + +echo $i diff --git a/.ci/soak/mitogen_py24.sh b/.ci/soak/mitogen_py24.sh new file mode 100755 index 00000000..475e0875 --- /dev/null +++ b/.ci/soak/mitogen_py24.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +export NOCOVERAGE=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/mitogen_py24_tests.py "$@" || break +done + +echo $i From 5dc0bd6f8dfab0cf5d2a390410e8242b94f37238 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:21:25 +0000 Subject: [PATCH 04/10] os_fork: clean up docs --- docs/changelog.rst | 8 +++---- mitogen/os_fork.py | 58 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 02d2495c..abddb894 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -158,10 +158,10 @@ Core Library * `#535 `_: a new :mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses - all thread activity during a fork. :class:`mitogen.core.Broker` and - :class:`mitogen.service.Pool` automatically record their existence so that an - :func:`os.fork` monkey-patch activated for Python 2.4 and 2.5 can - automatically pause them for any attempt to start a subprocess. + thread activity during fork. On Python<2.6, :class:`mitogen.core.Broker` and + :class:`mitogen.service.Pool` automatically record their existence so that a + :func:`os.fork` monkey-patch can automatically pause them for any attempt to + start a subprocess. * `ca63c26e `_: :meth:`mitogen.core.Latch.put`'s `obj` argument was made optional. diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index 386a061f..3dbfbcdf 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -29,13 +29,7 @@ # !mitogen: minify_safe """ -When operating in a mixed threading/forking environment, it is critical no -threads are active at the moment of fork, as they could be within critical -sections whose mutexes are snapshotted in the locked state in the fork child. - -To permit unbridled Mitogen use in a forking program, a mechanism must exist to -temporarily halt any threads in operation -- namely the broker and any pool -threads. +Support for operating in a mixed threading/forking environment. """ import os @@ -53,6 +47,10 @@ _pools = weakref.WeakKeyDictionary() def _notice_broker_or_pool(obj): + """ + Used by :mod:`mitogen.core` and :mod:`mitogen.service` to automatically + register every broker and pool on Python 2.4/2.5. + """ if isinstance(obj, mitogen.core.Broker): _brokers[obj] = True else: @@ -85,8 +83,31 @@ class Corker(object): :class:`mitogen.service.Pool` to be temporarily "corked" while fork operations may occur. + In a mixed threading/forking environment, it is critical no threads are + active at the moment of fork, as they could hold mutexes whose state is + unrecoverably snapshotted in the locked state in the fork child, causing + random deadlocks at random future moments. + + To ensure a target thread has all locks dropped, we ask it to write a large + string to a socket with a small buffer that has :data:`os.O_NONBLOCK` + disabled. CPython will drop the GIL and enter the ``write()`` system call, + where it will block until the socket buffer is drained, or the write side + is closed. + + We can ensure the thread really has blocked outside of any Python locks by + checking if the socket buffer has started to fill using a + :class:`mitogen.core.Poller`. + Since this necessarily involves posting a message to every existent thread and verifying acknowledgement, it will never be a fast operation. + + This does not yet handle the case of corking being initiated from within a + thread that is also a cork target. + + :param brokers: + Sequence of :class:`mitogen.core.Broker` instances to cork. + :param pools: + Sequence of :class:`mitogen.core.Pool` instances to cork. """ def __init__(self, brokers=(), pools=()): self.brokers = brokers @@ -106,13 +127,8 @@ class Corker(object): def _cork_one(self, s, obj): """ - To ensure the target thread has all locks dropped, we ask it to write a - large string to a socket with a small buffer that has O_NONBLOCK - disabled. CPython will drop the GIL and enter the write() system call, - where it will block until the socket buffer is drained, or the write - side is closed. We can detect the thread has blocked outside of Python - code by checking if the socket buffer has started to fill using a - poller. + Construct a socketpair, saving one side of it, and passing the other to + `obj` to be written to by one of its threads. """ rsock, wsock = mitogen.parent.create_socketpair(size=4096) mitogen.core.set_cloexec(rsock.fileno()) @@ -120,6 +136,12 @@ class Corker(object): mitogen.core.set_block(wsock) # gevent self._rsocks.append(rsock) obj.defer(self._do_cork, s, wsock) + + def _verify_one(self, rsock): + """ + Pause until the socket `rsock` indicates readability, due to + :meth:`_do_cork` triggering a blocking write on another thread. + """ poller = mitogen.core.Poller() poller.start_receive(rsock.fileno()) try: @@ -137,14 +159,22 @@ class Corker(object): """ s = mitogen.core.b('CORK') * ((128 // 4) * 1024) self._rsocks = [] + + # Pools must be paused first, as existing work may require the + # participation of a broker in order to complete. for pool in self.pools: if not pool.closed: for x in range(pool.size): self._cork_one(s, pool) + for broker in self.brokers: if broker._alive: self._cork_one(s, broker) + # Pause until we can detect every thread has entered write(). + for rsock in self._rsocks: + self._verify_one(rsock) + def uncork(self): """ Arrange for paused threads to resume operation. From 0a66ca72ef807e7d0107a7536594f7f5fa241503 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:22:32 +0000 Subject: [PATCH 05/10] os_fork: more doc tweaks --- docs/api.rst | 4 ++-- mitogen/os_fork.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 6add0568..db39ad99 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -609,8 +609,8 @@ Broker Class :members: -Corker Class -============ +Fork Safety +=========== .. currentmodule:: mitogen.os_fork .. autoclass:: Corker diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index 3dbfbcdf..55739877 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -86,7 +86,7 @@ class Corker(object): In a mixed threading/forking environment, it is critical no threads are active at the moment of fork, as they could hold mutexes whose state is unrecoverably snapshotted in the locked state in the fork child, causing - random deadlocks at random future moments. + deadlocks at random future moments. To ensure a target thread has all locks dropped, we ask it to write a large string to a socket with a small buffer that has :data:`os.O_NONBLOCK` From add357a0296903a718a031b313771e7405e4e7bd Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:25:27 +0000 Subject: [PATCH 06/10] os_fork: yet more doc tidyup --- mitogen/os_fork.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index 55739877..02162ea8 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -153,7 +153,7 @@ class Corker(object): def cork(self): """ - Arrange for the broker and optional pool to be paused with no locks + Arrange for any associated brokers and pools to be paused with no locks held. This will not return until each thread acknowledges it has ceased execution. """ From 7763549653d556768a8d0b08ef6fb33ae2b6ad1e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:24:03 +0000 Subject: [PATCH 07/10] os_fork: more doc tweaks --- mitogen/os_fork.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index 02162ea8..e318e79d 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -88,8 +88,8 @@ class Corker(object): unrecoverably snapshotted in the locked state in the fork child, causing deadlocks at random future moments. - To ensure a target thread has all locks dropped, we ask it to write a large - string to a socket with a small buffer that has :data:`os.O_NONBLOCK` + To ensure a target thread has all locks dropped, it is made to write a + large string to a socket with a small buffer that has :data:`os.O_NONBLOCK` disabled. CPython will drop the GIL and enter the ``write()`` system call, where it will block until the socket buffer is drained, or the write side is closed. From d51e70636d39f871d91829d10cfc6a7a1478de69 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 18:33:50 +0000 Subject: [PATCH 08/10] os_fork: more doc tweaks --- mitogen/os_fork.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index e318e79d..b27cfd5c 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -94,9 +94,9 @@ class Corker(object): where it will block until the socket buffer is drained, or the write side is closed. - We can ensure the thread really has blocked outside of any Python locks by - checking if the socket buffer has started to fill using a - :class:`mitogen.core.Poller`. + :class:`mitogen.core.Poller` is used to ensure the thread really has + blocked outside any Python locks, by checking if the socket buffer has + started to fill. Since this necessarily involves posting a message to every existent thread and verifying acknowledgement, it will never be a fast operation. From 36a5968ee2f03cd8bf194fb0430fd46b370b4bd9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Mon, 18 Feb 2019 19:58:54 +0000 Subject: [PATCH 09/10] .ci: copy private key file to tempdir. --- .ci/ci_lib.py | 11 +++++++++++ .ci/debops_common_tests.py | 8 ++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/.ci/ci_lib.py b/.ci/ci_lib.py index bc50d6f8..e1cb84d5 100644 --- a/.ci/ci_lib.py +++ b/.ci/ci_lib.py @@ -146,6 +146,17 @@ TARGET_COUNT = int(os.environ.get('TARGET_COUNT', '2')) BASE_PORT = 2200 TMP = TempDir().path + +# We copy this out of the way to avoid random stuff modifying perms in the Git +# tree (like git pull). +src_key_file = os.path.join(GIT_ROOT, + 'tests/data/docker/mitogen__has_sudo_pubkey.key') +key_file = os.path.join(TMP, + 'mitogen__has_sudo_pubkey.key') +shutil.copyfile(src_key_file, key_file) +os.chmod(key_file, int('0600', 8)) + + os.environ['PYTHONDONTWRITEBYTECODE'] = 'x' os.environ['PYTHONPATH'] = '%s:%s' % ( os.environ.get('PYTHONPATH', ''), diff --git a/.ci/debops_common_tests.py b/.ci/debops_common_tests.py index 6553ccea..b0e2e4e8 100755 --- a/.ci/debops_common_tests.py +++ b/.ci/debops_common_tests.py @@ -2,6 +2,7 @@ from __future__ import print_function import os +import shutil import ci_lib @@ -10,10 +11,6 @@ import ci_lib ci_lib.DISTROS = ['debian'] * ci_lib.TARGET_COUNT project_dir = os.path.join(ci_lib.TMP, 'project') -key_file = os.path.join( - ci_lib.GIT_ROOT, - 'tests/data/docker/mitogen__has_sudo_pubkey.key', -) vars_path = 'ansible/inventory/group_vars/debops_all_hosts.yml' inventory_path = 'ansible/inventory/hosts' docker_hostname = ci_lib.get_docker_hostname() @@ -36,7 +33,6 @@ with ci_lib.Fold('job_setup'): % (ci_lib.GIT_ROOT,) ) - ci_lib.run('chmod go= %s', key_file) with open(vars_path, 'w') as fp: fp.write( "ansible_python_interpreter: /usr/bin/python2.7\n" @@ -47,7 +43,7 @@ with ci_lib.Fold('job_setup'): "\n" # Speed up slow DH generation. "dhparam__bits: ['128', '64']\n" - % (key_file,) + % (ci_lib.key_file,) ) with open(inventory_path, 'a') as fp: From b7742a4b5fe679fa2eab4cdeea737ed9333144e6 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 19 Feb 2019 04:24:21 +0000 Subject: [PATCH 10/10] docs: better intro paragraph. --- docs/ansible.rst | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/ansible.rst b/docs/ansible.rst index f7788011..1c376a24 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -5,13 +5,14 @@ Mitogen for Ansible .. image:: images/ansible/ansible_mitogen.svg :class: mitogen-right-180 mitogen-logo-wrap -An extension to `Ansible`_ is included that implements connections over -Mitogen, replacing embedded shell invocations with pure-Python equivalents -invoked via highly efficient remote procedure calls to persistent interpreters -tunnelled over SSH. No changes are required to target hosts. - -The extension is stable and real-world use is encouraged. `Bug reports`_ are -welcome: Ansible is huge, and only wide testing will ensure soundness. +**Mitogen for Ansible** is a completely redesigned UNIX connection layer and +module runtime for `Ansible`_. Requiring minimal configuration changes, it +updates Ansible's slow and wasteful shell-centic implementation with +pure-Python equivalents, invoked via highly efficient remote procedure calls to +persistent interpreters tunnelled over SSH. No changes are required to target +hosts. + +The extension is considered stable and real-world use is encouraged. .. _Ansible: https://www.ansible.com/