utils: pad out reset_affinity() and integrate with detach_popen()

pull/564/head
David Wilson 5 years ago
parent b0e3807b16
commit e77048ec2d

@ -115,6 +115,11 @@ def _ioctl_cast(n):
return n
# If not :data:`None`, called prior to exec() of any new child process. Used by
# :func:`mitogen.utils.reset_affinity` to allow the child to be freely
# scheduled.
_preexec_hook = None
# Get PTY number; asm-generic/ioctls.h
LINUX_TIOCGPTN = _ioctl_cast(2147767344)
@ -262,7 +267,13 @@ def detach_popen(**kwargs):
# handling, without tying the surrounding code into managing a Popen
# object, which isn't possible for at least :mod:`mitogen.fork`. This
# should be replaced by a swappable helper class in a future version.
proc = subprocess.Popen(**kwargs)
real_preexec_fn = kwargs.pop('preexec_fn', None)
def preexec_fn():
if _preexec_hook:
_preexec_hook()
if real_preexec_fn:
real_preexec_fn()
proc = subprocess.Popen(preexec_fn=preexec_fn, **kwargs)
proc._child_created = False
return proc.pid

@ -42,6 +42,7 @@ except ImportError:
import mitogen
import mitogen.core
import mitogen.master
import mitogen.parent
LOG = logging.getLogger('mitogen')
@ -60,16 +61,42 @@ if ctypes:
_sched_setaffinity = None
def reset_affinity():
def reset_affinity(clear=False):
"""
Bind this process to a randomly selected CPU. If done prior to starting
threads, all threads will be bound to the same CPU. This call is a no-op on
systems other than Linux.
:param bool clear:
If :data:`True`, clear any prior binding.
A hook is installed that causes `reset_affinity(clear=True)` to run in the
child of any process created with :func:`mitogen.parent.detach_popen`,
ensuring CPU-intensive children like SSH are not forced to share the same
core as the (otherwise potentially very busy) parent.
Threads bound to the same CPU share cache and experience the lowest
possible inter-thread roundtrip latency, for example ensuring the minimum
possible time required for :class:`mitogen.service.Pool` to interact with
:class:`mitogen.core.Broker`, as required for every message transmitted or
received.
Binding threads of a Python process to one CPU makes sense, as they are
otherwise unable to operate in parallel, and all must acquire the same lock
prior to executing.
"""
if _sched_setaffinity is None:
return
cpus = multiprocessing.cpu_count()
cpu = random.randint(0, cpus - 1)
bits = struct.pack('L', 1 << cpu)
_sched_setaffinity(os.getpid(), len(bits), bits)
if clear:
mask = 0xffffffff
else:
mask = 1 << random.randint(0, multiprocessing.cpu_count() - 1)
s = struct.pack('L', mask)
_sched_setaffinity(os.getpid(), len(s), s)
mitogen.parent._preexec_hook = lambda: reset_affinity(clear=True)
def setup_gil():
"""

@ -9,6 +9,7 @@ import mitogen
import mitogen.utils
mitogen.utils.setup_gil()
mitogen.utils.reset_affinity()
X = 20000

@ -1,8 +1,13 @@
#!/usr/bin/env python
import os
import tempfile
import unittest2
import mock
import mitogen.core
import mitogen.parent
import mitogen.master
import mitogen.utils
from mitogen.core import b
@ -19,6 +24,53 @@ def func(router):
return router
class ResetAffinityTest(testlib.TestCase):
func = staticmethod(mitogen.utils.reset_affinity)
def _get_cpus(self, path='/proc/self/status'):
fp = open(path)
try:
for line in fp:
if line.startswith('Cpus_allowed'):
return int(line.split()[1], 16)
finally:
fp.close()
@mock.patch('random.randint')
def test_set_reset(self, randint):
randint.return_value = 3
before = self._get_cpus()
self.func()
self.assertEquals(self._get_cpus(), 1 << 3)
self.func(clear=True)
self.assertEquals(self._get_cpus(), before)
@mock.patch('random.randint')
def test_clear_on_popen(self, randint):
randint.return_value = 3
tf = tempfile.NamedTemporaryFile()
try:
before = self._get_cpus()
self.func()
my_cpu = self._get_cpus()
pid = mitogen.parent.detach_popen(
args=['cp', '/proc/self/status', tf.name]
)
os.waitpid(pid, 0)
his_cpu = self._get_cpus(tf.name)
self.assertNotEquals(my_cpu, his_cpu)
self.func(clear=True)
finally:
tf.close()
ResetAffinityTest = unittest2.skipIf(
reason='Linux only',
condition=os.uname()[0] != 'Linux'
)(ResetAffinityTest)
class RunWithRouterTest(testlib.TestCase):
# test_shutdown_on_exception
# test_shutdown_on_success

Loading…
Cancel
Save