Merge remote-tracking branch 'origin/dmw'

* origin/dmw:
  issue #537: disable just the trivial LinuxPolicyTest on Travis.
  docs: update Changelog; closes #537.
  ansible: refactor affinity class and add abstract tests.
  Bump version for release.
pull/564/head
David Wilson 6 years ago
commit 2654ab470d

@ -156,11 +156,11 @@ class Policy(object):
Assign the helper subprocess policy to this process. Assign the helper subprocess policy to this process.
""" """
class FixedPolicy(Policy):
class LinuxPolicy(Policy):
""" """
:class:`Policy` for Linux machines. The scheme here was tested on an :class:`Policy` for machines where the only control method available is
otherwise idle 16 thread machine. fixed CPU placement. The scheme here was tested on an otherwise idle 16
thread machine.
- The connection multiplexer is pinned to CPU 0. - The connection multiplexer is pinned to CPU 0.
- The Ansible top-level (strategy) is pinned to CPU 1. - The Ansible top-level (strategy) is pinned to CPU 1.
@ -180,26 +180,35 @@ class LinuxPolicy(Policy):
CPU-intensive children like SSH are not forced to share the same core as CPU-intensive children like SSH are not forced to share the same core as
the (otherwise potentially very busy) parent. the (otherwise potentially very busy) parent.
""" """
def __init__(self): def __init__(self, cpu_count=None):
#: For tests.
self.cpu_count = cpu_count or multiprocessing.cpu_count()
self.mem = mmap.mmap(-1, 4096) self.mem = mmap.mmap(-1, 4096)
self.state = State.from_buffer(self.mem) self.state = State.from_buffer(self.mem)
self.state.lock.init() self.state.lock.init()
if self._cpu_count() < 4:
self._reserve_mask = 3 if self.cpu_count < 2:
self._reserve_shift = 2 # uniprocessor
self._reserve_controller = True self._reserve_mux = False
else: self._reserve_controller = False
self._reserve_mask = 0
self._reserve_shift = 0
elif self.cpu_count < 4:
# small SMP
self._reserve_mux = True
self._reserve_controller = False
self._reserve_mask = 1 self._reserve_mask = 1
self._reserve_shift = 1 self._reserve_shift = 1
self._reserve_controller = False else:
# big SMP
self._reserve_mux = True
self._reserve_controller = True
self._reserve_mask = 3
self._reserve_shift = 2
def _set_affinity(self, mask): def _set_affinity(self, mask):
mitogen.parent._preexec_hook = self._clear mitogen.parent._preexec_hook = self._clear
s = struct.pack('L', mask) self._set_cpu_mask(mask)
_sched_setaffinity(os.getpid(), len(s), s)
def _cpu_count(self):
return multiprocessing.cpu_count()
def _balance(self): def _balance(self):
self.state.lock.acquire() self.state.lock.acquire()
@ -210,14 +219,15 @@ class LinuxPolicy(Policy):
self.state.lock.release() self.state.lock.release()
self._set_cpu(self._reserve_shift + ( self._set_cpu(self._reserve_shift + (
(n % max(1, (self._cpu_count() - self._reserve_shift))) (n % (self.cpu_count - self._reserve_shift))
)) ))
def _set_cpu(self, cpu): def _set_cpu(self, cpu):
self._set_affinity(1 << cpu) self._set_affinity(1 << cpu)
def _clear(self): def _clear(self):
self._set_affinity(0xffffffff & ~self._reserve_mask) all_cpus = (1 << self.cpu_count) - 1
self._set_affinity(all_cpus & ~self._reserve_mask)
def assign_controller(self): def assign_controller(self):
if self._reserve_controller: if self._reserve_controller:
@ -235,6 +245,12 @@ class LinuxPolicy(Policy):
self._clear() self._clear()
class LinuxPolicy(FixedPolicy):
def _set_cpu_mask(self, mask):
s = struct.pack('L', mask)
_sched_setaffinity(os.getpid(), len(s), s)
if _sched_setaffinity is not None: if _sched_setaffinity is not None:
policy = LinuxPolicy() policy = LinuxPolicy()
else: else:

@ -125,7 +125,7 @@ Core Library
series. series.
v0.2.5 (2019-02-1?) v0.2.5 (2019-02-14)
------------------- -------------------
Fixes Fixes
@ -145,6 +145,10 @@ Fixes
``simplejson`` from a controller that also loaded an incompatible newer ``simplejson`` from a controller that also loaded an incompatible newer
version of ``simplejson``. version of ``simplejson``.
* `#537 <https://github.com/dw/mitogen/issues/537>`_: a swapped operator in the
CPU affinity logic meant 2 cores were reserved on 1<n<4 core machines, rather
than 1 core as desired. Test coverage was added.
* `#538 <https://github.com/dw/mitogen/issues/538>`_: the source distribution * `#538 <https://github.com/dw/mitogen/issues/538>`_: the source distribution
includes a ``LICENSE`` file. includes a ``LICENSE`` file.
@ -179,7 +183,7 @@ Mitogen would not be possible without the support of users. A huge thanks for
bug reports, testing, features and fixes in this release contributed by bug reports, testing, features and fixes in this release contributed by
`Carl George <https://github.com/carlwgeorge>`_, `Carl George <https://github.com/carlwgeorge>`_,
`Guy Knights <https://github.com/knightsg>`_, and `Guy Knights <https://github.com/knightsg>`_, and
`Josh Smift <https://github.com/jbscare>`_, `Josh Smift <https://github.com/jbscare>`_.
v0.2.4 (2019-02-10) v0.2.4 (2019-02-10)

@ -35,7 +35,7 @@ be expected. On the slave, it is built dynamically during startup.
#: Library version as a tuple. #: Library version as a tuple.
__version__ = (0, 2, 4) __version__ = (0, 2, 5)
#: This is :data:`False` in slave contexts. Previously it was used to prevent #: This is :data:`False` in slave contexts. Previously it was used to prevent

@ -11,11 +11,156 @@ import mitogen.parent
import ansible_mitogen.affinity import ansible_mitogen.affinity
class NullFixedPolicy(ansible_mitogen.affinity.FixedPolicy):
def _set_cpu_mask(self, mask):
self.mask = mask
class FixedPolicyTest(testlib.TestCase):
klass = NullFixedPolicy
def test_assign_controller_1core(self):
# Uniprocessor .
policy = self.klass(cpu_count=1)
policy.assign_controller()
self.assertEquals(0x1, policy.mask)
def test_assign_controller_2core(self):
# Small SMP gets 1.. % cpu_count
policy = self.klass(cpu_count=2)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
def test_assign_controller_3core(self):
# Small SMP gets 1.. % cpu_count
policy = self.klass(cpu_count=3)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x4, policy.mask)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x4, policy.mask)
policy.assign_controller()
def test_assign_controller_4core(self):
# Big SMP gets a dedicated core.
policy = self.klass(cpu_count=4)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
def test_assign_muxprocess_1core(self):
# Uniprocessor .
policy = self.klass(cpu_count=1)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_muxprocess_2core(self):
# Small SMP gets dedicated core.
policy = self.klass(cpu_count=2)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
def test_assign_muxprocess_3core(self):
# Small SMP gets a dedicated core.
policy = self.klass(cpu_count=3)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_muxprocess_4core(self):
# Big SMP gets a dedicated core.
policy = self.klass(cpu_count=4)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_worker_1core(self):
# Balance n % 1
policy = self.klass(cpu_count=1)
policy.assign_worker()
self.assertEquals(0x1, policy.mask)
policy.assign_worker()
self.assertEquals(0x1, policy.mask)
def test_assign_worker_2core(self):
# Balance n % 1
policy = self.klass(cpu_count=2)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
def test_assign_worker_3core(self):
# Balance n % 1
policy = self.klass(cpu_count=3)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
policy.assign_worker()
self.assertEquals(0x4, policy.mask)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
def test_assign_worker_4core(self):
# Balance n % 1
policy = self.klass(cpu_count=4)
policy.assign_worker()
self.assertEquals(4, policy.mask)
policy.assign_worker()
self.assertEquals(8, policy.mask)
policy.assign_worker()
self.assertEquals(4, policy.mask)
def test_assign_subprocess_1core(self):
# allow all except reserved.
policy = self.klass(cpu_count=1)
policy.assign_subprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_subprocess_2core(self):
# allow all except reserved.
policy = self.klass(cpu_count=2)
policy.assign_subprocess()
self.assertEquals(0x2, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x2, policy.mask)
def test_assign_subprocess_3core(self):
# allow all except reserved.
policy = self.klass(cpu_count=3)
policy.assign_subprocess()
self.assertEquals(0x2 + 0x4, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x2 + 0x4, policy.mask)
def test_assign_subprocess_4core(self):
# allow all except reserved.
policy = self.klass(cpu_count=4)
policy.assign_subprocess()
self.assertEquals(0x4 + 0x8, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x4 + 0x8, policy.mask)
@unittest2.skipIf( @unittest2.skipIf(
reason='Linux/SMP only', reason='Linux/SMP only',
condition=(not ( condition=(not (
os.uname()[0] == 'Linux' and os.uname()[0] == 'Linux' and
multiprocessing.cpu_count() >= 4 multiprocessing.cpu_count() > 2
)) ))
) )
class LinuxPolicyTest(testlib.TestCase): class LinuxPolicyTest(testlib.TestCase):
@ -33,12 +178,15 @@ class LinuxPolicyTest(testlib.TestCase):
finally: finally:
fp.close() fp.close()
def test_set_clear(self): def test_set_cpu_mask(self):
before = self._get_cpus() self.policy._set_cpu_mask(0x1)
self.policy._set_cpu(3) self.assertEquals(0x1, self._get_cpus())
self.assertEquals(self._get_cpus(), 1 << 3)
self.policy._clear() self.policy._set_cpu_mask(0x2)
self.assertEquals(self._get_cpus(), before) self.assertEquals(0x2, self._get_cpus())
self.policy._set_cpu_mask(0x3)
self.assertEquals(0x3, self._get_cpus())
def test_clear_on_popen(self): def test_clear_on_popen(self):
tf = tempfile.NamedTemporaryFile() tf = tempfile.NamedTemporaryFile()

Loading…
Cancel
Save