ansible: refactor affinity class and add abstract tests.

pull/862/head
David Wilson 6 years ago
parent e010667230
commit 8f9c67daf1

@ -156,11 +156,11 @@ class Policy(object):
Assign the helper subprocess policy to this process. Assign the helper subprocess policy to this process.
""" """
class FixedPolicy(Policy):
class LinuxPolicy(Policy):
""" """
:class:`Policy` for Linux machines. The scheme here was tested on an :class:`Policy` for machines where the only control method available is
otherwise idle 16 thread machine. fixed CPU placement. The scheme here was tested on an otherwise idle 16
thread machine.
- The connection multiplexer is pinned to CPU 0. - The connection multiplexer is pinned to CPU 0.
- The Ansible top-level (strategy) is pinned to CPU 1. - The Ansible top-level (strategy) is pinned to CPU 1.
@ -180,26 +180,35 @@ class LinuxPolicy(Policy):
CPU-intensive children like SSH are not forced to share the same core as CPU-intensive children like SSH are not forced to share the same core as
the (otherwise potentially very busy) parent. the (otherwise potentially very busy) parent.
""" """
def __init__(self): def __init__(self, cpu_count=None):
#: For tests.
self.cpu_count = cpu_count or multiprocessing.cpu_count()
self.mem = mmap.mmap(-1, 4096) self.mem = mmap.mmap(-1, 4096)
self.state = State.from_buffer(self.mem) self.state = State.from_buffer(self.mem)
self.state.lock.init() self.state.lock.init()
if self._cpu_count() < 4:
self._reserve_mask = 3 if self.cpu_count < 2:
self._reserve_shift = 2 # uniprocessor
self._reserve_controller = True self._reserve_mux = False
else: self._reserve_controller = False
self._reserve_mask = 0
self._reserve_shift = 0
elif self.cpu_count < 4:
# small SMP
self._reserve_mux = True
self._reserve_controller = False
self._reserve_mask = 1 self._reserve_mask = 1
self._reserve_shift = 1 self._reserve_shift = 1
self._reserve_controller = False else:
# big SMP
self._reserve_mux = True
self._reserve_controller = True
self._reserve_mask = 3
self._reserve_shift = 2
def _set_affinity(self, mask): def _set_affinity(self, mask):
mitogen.parent._preexec_hook = self._clear mitogen.parent._preexec_hook = self._clear
s = struct.pack('L', mask) self._set_cpu_mask(mask)
_sched_setaffinity(os.getpid(), len(s), s)
def _cpu_count(self):
return multiprocessing.cpu_count()
def _balance(self): def _balance(self):
self.state.lock.acquire() self.state.lock.acquire()
@ -210,14 +219,15 @@ class LinuxPolicy(Policy):
self.state.lock.release() self.state.lock.release()
self._set_cpu(self._reserve_shift + ( self._set_cpu(self._reserve_shift + (
(n % max(1, (self._cpu_count() - self._reserve_shift))) (n % (self.cpu_count - self._reserve_shift))
)) ))
def _set_cpu(self, cpu): def _set_cpu(self, cpu):
self._set_affinity(1 << cpu) self._set_affinity(1 << cpu)
def _clear(self): def _clear(self):
self._set_affinity(0xffffffff & ~self._reserve_mask) all_cpus = (1 << self.cpu_count) - 1
self._set_affinity(all_cpus & ~self._reserve_mask)
def assign_controller(self): def assign_controller(self):
if self._reserve_controller: if self._reserve_controller:
@ -235,6 +245,12 @@ class LinuxPolicy(Policy):
self._clear() self._clear()
class LinuxPolicy(FixedPolicy):
def _set_cpu_mask(self, mask):
s = struct.pack('L', mask)
_sched_setaffinity(os.getpid(), len(s), s)
if _sched_setaffinity is not None: if _sched_setaffinity is not None:
policy = LinuxPolicy() policy = LinuxPolicy()
else: else:

@ -11,11 +11,156 @@ import mitogen.parent
import ansible_mitogen.affinity import ansible_mitogen.affinity
class NullFixedPolicy(ansible_mitogen.affinity.FixedPolicy):
def _set_cpu_mask(self, mask):
self.mask = mask
class FixedPolicyTest(testlib.TestCase):
klass = NullFixedPolicy
def test_assign_controller_1core(self):
# Uniprocessor .
policy = self.klass(cpu_count=1)
policy.assign_controller()
self.assertEquals(0x1, policy.mask)
def test_assign_controller_2core(self):
# Small SMP gets 1.. % cpu_count
policy = self.klass(cpu_count=2)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
def test_assign_controller_3core(self):
# Small SMP gets 1.. % cpu_count
policy = self.klass(cpu_count=3)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x4, policy.mask)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x4, policy.mask)
policy.assign_controller()
def test_assign_controller_4core(self):
# Big SMP gets a dedicated core.
policy = self.klass(cpu_count=4)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
policy.assign_controller()
self.assertEquals(0x2, policy.mask)
def test_assign_muxprocess_1core(self):
# Uniprocessor .
policy = self.klass(cpu_count=1)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_muxprocess_2core(self):
# Small SMP gets dedicated core.
policy = self.klass(cpu_count=2)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
def test_assign_muxprocess_3core(self):
# Small SMP gets a dedicated core.
policy = self.klass(cpu_count=3)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_muxprocess_4core(self):
# Big SMP gets a dedicated core.
policy = self.klass(cpu_count=4)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_muxprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_worker_1core(self):
# Balance n % 1
policy = self.klass(cpu_count=1)
policy.assign_worker()
self.assertEquals(0x1, policy.mask)
policy.assign_worker()
self.assertEquals(0x1, policy.mask)
def test_assign_worker_2core(self):
# Balance n % 1
policy = self.klass(cpu_count=2)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
def test_assign_worker_3core(self):
# Balance n % 1
policy = self.klass(cpu_count=3)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
policy.assign_worker()
self.assertEquals(0x4, policy.mask)
policy.assign_worker()
self.assertEquals(0x2, policy.mask)
def test_assign_worker_4core(self):
# Balance n % 1
policy = self.klass(cpu_count=4)
policy.assign_worker()
self.assertEquals(4, policy.mask)
policy.assign_worker()
self.assertEquals(8, policy.mask)
policy.assign_worker()
self.assertEquals(4, policy.mask)
def test_assign_subprocess_1core(self):
# allow all except reserved.
policy = self.klass(cpu_count=1)
policy.assign_subprocess()
self.assertEquals(0x1, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x1, policy.mask)
def test_assign_subprocess_2core(self):
# allow all except reserved.
policy = self.klass(cpu_count=2)
policy.assign_subprocess()
self.assertEquals(0x2, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x2, policy.mask)
def test_assign_subprocess_3core(self):
# allow all except reserved.
policy = self.klass(cpu_count=3)
policy.assign_subprocess()
self.assertEquals(0x2 + 0x4, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x2 + 0x4, policy.mask)
def test_assign_subprocess_4core(self):
# allow all except reserved.
policy = self.klass(cpu_count=4)
policy.assign_subprocess()
self.assertEquals(0x4 + 0x8, policy.mask)
policy.assign_subprocess()
self.assertEquals(0x4 + 0x8, policy.mask)
@unittest2.skipIf( @unittest2.skipIf(
reason='Linux/SMP only', reason='Linux/SMP only',
condition=(not ( condition=(not (
os.uname()[0] == 'Linux' and os.uname()[0] == 'Linux' and
multiprocessing.cpu_count() >= 4 multiprocessing.cpu_count() > 1
)) ))
) )
class LinuxPolicyTest(testlib.TestCase): class LinuxPolicyTest(testlib.TestCase):
@ -33,6 +178,16 @@ class LinuxPolicyTest(testlib.TestCase):
finally: finally:
fp.close() fp.close()
def test_set_cpu_mask(self):
self.policy._set_cpu_mask(0x1)
self.assertEquals(0x1, self._get_cpus())
self.policy._set_cpu_mask(0x2)
self.assertEquals(0x2, self._get_cpus())
self.policy._set_cpu_mask(0x3)
self.assertEquals(0x3, self._get_cpus())
def test_set_clear(self): def test_set_clear(self):
before = self._get_cpus() before = self._get_cpus()
self.policy._set_cpu(3) self.policy._set_cpu(3)

Loading…
Cancel
Save