diff --git a/.ci/ci_lib.py b/.ci/ci_lib.py index d4f32f55..e1cb84d5 100644 --- a/.ci/ci_lib.py +++ b/.ci/ci_lib.py @@ -146,6 +146,17 @@ TARGET_COUNT = int(os.environ.get('TARGET_COUNT', '2')) BASE_PORT = 2200 TMP = TempDir().path + +# We copy this out of the way to avoid random stuff modifying perms in the Git +# tree (like git pull). +src_key_file = os.path.join(GIT_ROOT, + 'tests/data/docker/mitogen__has_sudo_pubkey.key') +key_file = os.path.join(TMP, + 'mitogen__has_sudo_pubkey.key') +shutil.copyfile(src_key_file, key_file) +os.chmod(key_file, int('0600', 8)) + + os.environ['PYTHONDONTWRITEBYTECODE'] = 'x' os.environ['PYTHONPATH'] = '%s:%s' % ( os.environ.get('PYTHONPATH', ''), @@ -165,7 +176,7 @@ def image_for_distro(distro): return 'mitogen/%s-test' % (distro.partition('-')[0],) -def make_containers(): +def make_containers(name_prefix='', port_offset=0): docker_hostname = get_docker_hostname() firstbit = lambda s: (s+'-').split('-')[0] secondbit = lambda s: (s+'-').split('-')[1] @@ -183,9 +194,9 @@ def make_containers(): for x in range(count): lst.append({ "distro": firstbit(distro), - "name": "target-%s-%s" % (distro, i), + "name": name_prefix + ("target-%s-%s" % (distro, i)), "hostname": docker_hostname, - "port": BASE_PORT + i, + "port": BASE_PORT + i + port_offset, "python_path": ( '/usr/bin/python3' if secondbit(distro) == 'py3' @@ -207,6 +218,8 @@ def start_containers(containers): "docker run " "--rm " "--detach " + "--privileged " + "--cap-add=SYS_PTRACE " "--publish 0.0.0.0:%(port)s:22/tcp " "--hostname=%(name)s " "--name=%(name)s " diff --git a/.ci/debops_common_tests.py b/.ci/debops_common_tests.py index 8e9f2953..b0e2e4e8 100755 --- a/.ci/debops_common_tests.py +++ b/.ci/debops_common_tests.py @@ -2,6 +2,7 @@ from __future__ import print_function import os +import shutil import ci_lib @@ -10,17 +11,13 @@ import ci_lib ci_lib.DISTROS = ['debian'] * ci_lib.TARGET_COUNT project_dir = os.path.join(ci_lib.TMP, 'project') -key_file = os.path.join( - ci_lib.GIT_ROOT, - 'tests/data/docker/mitogen__has_sudo_pubkey.key', -) vars_path = 'ansible/inventory/group_vars/debops_all_hosts.yml' inventory_path = 'ansible/inventory/hosts' docker_hostname = ci_lib.get_docker_hostname() with ci_lib.Fold('docker_setup'): - containers = ci_lib.make_containers() + containers = ci_lib.make_containers(port_offset=500, name_prefix='debops-') ci_lib.start_containers(containers) @@ -36,7 +33,6 @@ with ci_lib.Fold('job_setup'): % (ci_lib.GIT_ROOT,) ) - ci_lib.run('chmod go= %s', key_file) with open(vars_path, 'w') as fp: fp.write( "ansible_python_interpreter: /usr/bin/python2.7\n" @@ -47,7 +43,7 @@ with ci_lib.Fold('job_setup'): "\n" # Speed up slow DH generation. "dhparam__bits: ['128', '64']\n" - % (key_file,) + % (ci_lib.key_file,) ) with open(inventory_path, 'a') as fp: diff --git a/.ci/soak/debops_common.sh b/.ci/soak/debops_common.sh new file mode 100755 index 00000000..eefb4917 --- /dev/null +++ b/.ci/soak/debops_common.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +export NOCOVERAGE=1 + +# Make Docker containers once. +/usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break +export KEEP=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/debops_common_tests.py "$@" || break +done + +echo $i diff --git a/.ci/soak/mitogen.sh b/.ci/soak/mitogen.sh new file mode 100755 index 00000000..15d62529 --- /dev/null +++ b/.ci/soak/mitogen.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +export NOCOVERAGE=1 +export DISTROS="debian*4" + +# Make Docker containers once. +/usr/bin/time -v ./.ci/ansible_tests.py "$@" +export KEEP=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/ansible_tests.py "$@" || break +done + +echo $i diff --git a/.ci/soak/mitogen_py24.sh b/.ci/soak/mitogen_py24.sh new file mode 100755 index 00000000..475e0875 --- /dev/null +++ b/.ci/soak/mitogen_py24.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +export NOCOVERAGE=1 + +i=0 +while : +do + i=$((i + 1)) + /usr/bin/time -v ./.ci/mitogen_py24_tests.py "$@" || break +done + +echo $i diff --git a/docs/ansible.rst b/docs/ansible.rst index f7788011..1c376a24 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -5,13 +5,14 @@ Mitogen for Ansible .. image:: images/ansible/ansible_mitogen.svg :class: mitogen-right-180 mitogen-logo-wrap -An extension to `Ansible`_ is included that implements connections over -Mitogen, replacing embedded shell invocations with pure-Python equivalents -invoked via highly efficient remote procedure calls to persistent interpreters -tunnelled over SSH. No changes are required to target hosts. - -The extension is stable and real-world use is encouraged. `Bug reports`_ are -welcome: Ansible is huge, and only wide testing will ensure soundness. +**Mitogen for Ansible** is a completely redesigned UNIX connection layer and +module runtime for `Ansible`_. Requiring minimal configuration changes, it +updates Ansible's slow and wasteful shell-centic implementation with +pure-Python equivalents, invoked via highly efficient remote procedure calls to +persistent interpreters tunnelled over SSH. No changes are required to target +hosts. + +The extension is considered stable and real-world use is encouraged. .. _Ansible: https://www.ansible.com/ diff --git a/docs/api.rst b/docs/api.rst index 6add0568..db39ad99 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -609,8 +609,8 @@ Broker Class :members: -Corker Class -============ +Fork Safety +=========== .. currentmodule:: mitogen.os_fork .. autoclass:: Corker diff --git a/docs/changelog.rst b/docs/changelog.rst index 02d2495c..abddb894 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -158,10 +158,10 @@ Core Library * `#535 `_: a new :mod:`mitogen.os_fork` module provides a :func:`os.fork` wrapper that pauses - all thread activity during a fork. :class:`mitogen.core.Broker` and - :class:`mitogen.service.Pool` automatically record their existence so that an - :func:`os.fork` monkey-patch activated for Python 2.4 and 2.5 can - automatically pause them for any attempt to start a subprocess. + thread activity during fork. On Python<2.6, :class:`mitogen.core.Broker` and + :class:`mitogen.service.Pool` automatically record their existence so that a + :func:`os.fork` monkey-patch can automatically pause them for any attempt to + start a subprocess. * `ca63c26e `_: :meth:`mitogen.core.Latch.put`'s `obj` argument was made optional. diff --git a/mitogen/os_fork.py b/mitogen/os_fork.py index e6a3aafc..b27cfd5c 100644 --- a/mitogen/os_fork.py +++ b/mitogen/os_fork.py @@ -29,13 +29,7 @@ # !mitogen: minify_safe """ -When operating in a mixed threading/forking environment, it is critical no -threads are active at the moment of fork, as they could be within critical -sections whose mutexes are snapshotted in the locked state in the fork child. - -To permit unbridled Mitogen use in a forking program, a mechanism must exist to -temporarily halt any threads in operation -- namely the broker and any pool -threads. +Support for operating in a mixed threading/forking environment. """ import os @@ -53,6 +47,10 @@ _pools = weakref.WeakKeyDictionary() def _notice_broker_or_pool(obj): + """ + Used by :mod:`mitogen.core` and :mod:`mitogen.service` to automatically + register every broker and pool on Python 2.4/2.5. + """ if isinstance(obj, mitogen.core.Broker): _brokers[obj] = True else: @@ -85,8 +83,31 @@ class Corker(object): :class:`mitogen.service.Pool` to be temporarily "corked" while fork operations may occur. + In a mixed threading/forking environment, it is critical no threads are + active at the moment of fork, as they could hold mutexes whose state is + unrecoverably snapshotted in the locked state in the fork child, causing + deadlocks at random future moments. + + To ensure a target thread has all locks dropped, it is made to write a + large string to a socket with a small buffer that has :data:`os.O_NONBLOCK` + disabled. CPython will drop the GIL and enter the ``write()`` system call, + where it will block until the socket buffer is drained, or the write side + is closed. + + :class:`mitogen.core.Poller` is used to ensure the thread really has + blocked outside any Python locks, by checking if the socket buffer has + started to fill. + Since this necessarily involves posting a message to every existent thread and verifying acknowledgement, it will never be a fast operation. + + This does not yet handle the case of corking being initiated from within a + thread that is also a cork target. + + :param brokers: + Sequence of :class:`mitogen.core.Broker` instances to cork. + :param pools: + Sequence of :class:`mitogen.core.Pool` instances to cork. """ def __init__(self, brokers=(), pools=()): self.brokers = brokers @@ -106,13 +127,8 @@ class Corker(object): def _cork_one(self, s, obj): """ - To ensure the target thread has all locks dropped, we ask it to write a - large string to a socket with a small buffer that has O_NONBLOCK - disabled. CPython will drop the GIL and enter the write() system call, - where it will block until the socket buffer is drained, or the write - side is closed. We can detect the thread has blocked outside of Python - code by checking if the socket buffer has started to fill using a - poller. + Construct a socketpair, saving one side of it, and passing the other to + `obj` to be written to by one of its threads. """ rsock, wsock = mitogen.parent.create_socketpair(size=4096) mitogen.core.set_cloexec(rsock.fileno()) @@ -120,6 +136,12 @@ class Corker(object): mitogen.core.set_block(wsock) # gevent self._rsocks.append(rsock) obj.defer(self._do_cork, s, wsock) + + def _verify_one(self, rsock): + """ + Pause until the socket `rsock` indicates readability, due to + :meth:`_do_cork` triggering a blocking write on another thread. + """ poller = mitogen.core.Poller() poller.start_receive(rsock.fileno()) try: @@ -131,20 +153,28 @@ class Corker(object): def cork(self): """ - Arrange for the broker and optional pool to be paused with no locks + Arrange for any associated brokers and pools to be paused with no locks held. This will not return until each thread acknowledges it has ceased execution. """ - s = 'CORK' * ((128 / 4) * 1024) + s = mitogen.core.b('CORK') * ((128 // 4) * 1024) self._rsocks = [] + + # Pools must be paused first, as existing work may require the + # participation of a broker in order to complete. for pool in self.pools: if not pool.closed: for x in range(pool.size): self._cork_one(s, pool) + for broker in self.brokers: if broker._alive: self._cork_one(s, broker) + # Pause until we can detect every thread has entered write(). + for rsock in self._rsocks: + self._verify_one(rsock) + def uncork(self): """ Arrange for paused threads to resume operation. diff --git a/tests/os_fork_test.py b/tests/os_fork_test.py new file mode 100644 index 00000000..14ea8465 --- /dev/null +++ b/tests/os_fork_test.py @@ -0,0 +1,52 @@ + +import testlib +import unittest2 + +import mitogen.os_fork +import mitogen.service + + +class CorkTest(testlib.RouterMixin, testlib.TestCase): + klass = mitogen.os_fork.Corker + + def ping(self, latch): + latch.put('pong') + + def test_cork_broker(self): + latch = mitogen.core.Latch() + self.broker.defer(self.ping, latch) + self.assertEquals('pong', latch.get()) + + corker = self.klass(brokers=(self.broker,)) + corker.cork() + + latch = mitogen.core.Latch() + self.broker.defer(self.ping, latch) + self.assertRaises(mitogen.core.TimeoutError, + lambda: latch.get(timeout=0.5)) + corker.uncork() + self.assertEquals('pong', latch.get()) + + def test_cork_pool(self): + pool = mitogen.service.Pool(self.router, services=(), size=4) + try: + latch = mitogen.core.Latch() + pool.defer(self.ping, latch) + self.assertEquals('pong', latch.get()) + + corker = self.klass(pools=(pool,)) + corker.cork() + + latch = mitogen.core.Latch() + pool.defer(self.ping, latch) + self.assertRaises(mitogen.core.TimeoutError, + lambda: latch.get(timeout=0.5)) + corker.uncork() + self.assertEquals('pong', latch.get()) + finally: + pool.stop(join=True) + + + +if __name__ == '__main__': + unittest2.main()