diff --git a/mitogen/parent.py b/mitogen/parent.py index fc5433ed..3f508f21 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -115,6 +115,11 @@ def _ioctl_cast(n): return n +# If not :data:`None`, called prior to exec() of any new child process. Used by +# :func:`mitogen.utils.reset_affinity` to allow the child to be freely +# scheduled. +_preexec_hook = None + # Get PTY number; asm-generic/ioctls.h LINUX_TIOCGPTN = _ioctl_cast(2147767344) @@ -262,7 +267,13 @@ def detach_popen(**kwargs): # handling, without tying the surrounding code into managing a Popen # object, which isn't possible for at least :mod:`mitogen.fork`. This # should be replaced by a swappable helper class in a future version. - proc = subprocess.Popen(**kwargs) + real_preexec_fn = kwargs.pop('preexec_fn', None) + def preexec_fn(): + if _preexec_hook: + _preexec_hook() + if real_preexec_fn: + real_preexec_fn() + proc = subprocess.Popen(preexec_fn=preexec_fn, **kwargs) proc._child_created = False return proc.pid diff --git a/mitogen/utils.py b/mitogen/utils.py index b5482afa..e24134e1 100644 --- a/mitogen/utils.py +++ b/mitogen/utils.py @@ -42,6 +42,7 @@ except ImportError: import mitogen import mitogen.core import mitogen.master +import mitogen.parent LOG = logging.getLogger('mitogen') @@ -60,16 +61,42 @@ if ctypes: _sched_setaffinity = None -def reset_affinity(): +def reset_affinity(clear=False): """ + Bind this process to a randomly selected CPU. If done prior to starting + threads, all threads will be bound to the same CPU. This call is a no-op on + systems other than Linux. + + :param bool clear: + If :data:`True`, clear any prior binding. + + A hook is installed that causes `reset_affinity(clear=True)` to run in the + child of any process created with :func:`mitogen.parent.detach_popen`, + ensuring CPU-intensive children like SSH are not forced to share the same + core as the (otherwise potentially very busy) parent. + + Threads bound to the same CPU share cache and experience the lowest + possible inter-thread roundtrip latency, for example ensuring the minimum + possible time required for :class:`mitogen.service.Pool` to interact with + :class:`mitogen.core.Broker`, as required for every message transmitted or + received. + + Binding threads of a Python process to one CPU makes sense, as they are + otherwise unable to operate in parallel, and all must acquire the same lock + prior to executing. """ if _sched_setaffinity is None: return - cpus = multiprocessing.cpu_count() - cpu = random.randint(0, cpus - 1) - bits = struct.pack('L', 1 << cpu) - _sched_setaffinity(os.getpid(), len(bits), bits) + if clear: + mask = 0xffffffff + else: + mask = 1 << random.randint(0, multiprocessing.cpu_count() - 1) + + s = struct.pack('L', mask) + _sched_setaffinity(os.getpid(), len(s), s) + mitogen.parent._preexec_hook = lambda: reset_affinity(clear=True) + def setup_gil(): """ diff --git a/tests/bench/latch_roundtrip.py b/tests/bench/latch_roundtrip.py index e248c803..51492afc 100644 --- a/tests/bench/latch_roundtrip.py +++ b/tests/bench/latch_roundtrip.py @@ -9,6 +9,7 @@ import mitogen import mitogen.utils mitogen.utils.setup_gil() +mitogen.utils.reset_affinity() X = 20000 diff --git a/tests/utils_test.py b/tests/utils_test.py index 17b260db..8a270a5f 100644 --- a/tests/utils_test.py +++ b/tests/utils_test.py @@ -1,8 +1,13 @@ #!/usr/bin/env python +import os +import tempfile + import unittest2 +import mock import mitogen.core +import mitogen.parent import mitogen.master import mitogen.utils from mitogen.core import b @@ -19,6 +24,53 @@ def func(router): return router +class ResetAffinityTest(testlib.TestCase): + func = staticmethod(mitogen.utils.reset_affinity) + + def _get_cpus(self, path='/proc/self/status'): + fp = open(path) + try: + for line in fp: + if line.startswith('Cpus_allowed'): + return int(line.split()[1], 16) + finally: + fp.close() + + @mock.patch('random.randint') + def test_set_reset(self, randint): + randint.return_value = 3 + before = self._get_cpus() + self.func() + self.assertEquals(self._get_cpus(), 1 << 3) + self.func(clear=True) + self.assertEquals(self._get_cpus(), before) + + @mock.patch('random.randint') + def test_clear_on_popen(self, randint): + randint.return_value = 3 + tf = tempfile.NamedTemporaryFile() + try: + before = self._get_cpus() + self.func() + my_cpu = self._get_cpus() + + pid = mitogen.parent.detach_popen( + args=['cp', '/proc/self/status', tf.name] + ) + os.waitpid(pid, 0) + + his_cpu = self._get_cpus(tf.name) + self.assertNotEquals(my_cpu, his_cpu) + self.func(clear=True) + finally: + tf.close() + +ResetAffinityTest = unittest2.skipIf( + reason='Linux only', + condition=os.uname()[0] != 'Linux' +)(ResetAffinityTest) + + class RunWithRouterTest(testlib.TestCase): # test_shutdown_on_exception # test_shutdown_on_success