diff --git a/ansible_mitogen/affinity.py b/ansible_mitogen/affinity.py index b6b10e02..b97aa38a 100644 --- a/ansible_mitogen/affinity.py +++ b/ansible_mitogen/affinity.py @@ -26,6 +26,53 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +""" +As Mitogen separates asynchronous IO out to a broker thread, communication +necessarily involves context switching and waking that thread. When application +threads and the broker share a CPU, this can be almost invisibly fast - around +25 microseconds for a full A->B->A round-trip. + +However when threads are scheduled on different CPUs, round-trip delays +regularly vary wildly, and easily into milliseconds. Many contributing factors +exist, not least scenarios like: + +1. A is preempted immediately after waking B, but before releasing the GIL. +2. B wakes from IO wait only to immediately enter futex wait. +3. A may wait 10ms or more for another timeslice, as the scheduler on its CPU + runs threads unrelated to its transaction (i.e. not B), wake only to release + its GIL, before entering IO sleep waiting for a reply from B, which cannot + exist yet. +4. B wakes, acquires GIL, performs work, and sends reply to A, causing it to + wake. B is preempted before releasing GIL. +5. A wakes from IO wait only to immediately enter futex wait. +6. B may wait 10ms or more for another timeslice, wake only to release its GIL, + before sleeping again. +7. A wakes, acquires GIL, finally receives reply. + +Per above if we are unlucky, on an even moderately busy machine it is possible +to lose milliseconds just in scheduling delay, and the effect is compounded +when pairs of threads in process A are communicating with pairs of threads in +process B using the same scheme, such as when Ansible WorkerProcess is +communicating with ContextService in the connection multiplexer. In the worst +case it could involve 4 threads working in lockstep spread across 4 busy CPUs. + +Since multithreading in Python is essentially useless except for waiting on IO +due to the presence of the GIL, at least in Ansible there is no good reason for +threads in the same process to run on distinct CPUs - they always operate in +lockstep due to the GIL, and are thus vulnerable to issues like above. + +Linux lacks any natural API to describe what we want, it only permits +individual threads to be constrained to run on specific CPUs, and for that +constraint to be inherited by new threads and forks of the constrained thread. + +This module therefore implements a CPU pinning policy for Ansible processes, +providing methods that should be called early in any new process, either to +rebalance which CPU it is pinned to, or in the case of subprocesses, to remove +the pinning entirely. It is likely to require ongoing tweaking, since pinning +necessarily involves preventing the scheduler from making load balancing +decisions. +""" + import ctypes import mmap import multiprocessing @@ -45,9 +92,17 @@ try: _sched_setaffinity = _libc.sched_setaffinity except (OSError, AttributeError): _libc = None + _strerror = None + _pthread_mutex_init = None + _pthread_mutex_lock = None + _pthread_mutex_unlock = None + _sched_setaffinity = None class pthread_mutex_t(ctypes.Structure): + """ + Wrap pthread_mutex_t to allow storing a lock in shared memory. + """ _fields_ = [ ('data', ctypes.c_uint8 * 512), ] @@ -66,32 +121,60 @@ class pthread_mutex_t(ctypes.Structure): class State(ctypes.Structure): + """ + Contents of shared memory segment. This allows :meth:`Manager.assign` to be + called from any child, since affinity assignment must happen from within + the context of the new child process. + """ _fields_ = [ ('lock', pthread_mutex_t), ('counter', ctypes.c_uint8), ] -class Manager(object): +class Policy(object): + """ + Process affinity policy. + """ + def assign_controller(self): + """ + Assign the Ansible top-level policy to this process. + """ + + def assign_muxprocess(self): + """ + Assign the MuxProcess policy to this process. + """ + + def assign_worker(self): + """ + Assign the WorkerProcess policy to this process. + """ + + def assign_subprocess(self): + """ + Assign the helper subprocess policy to this process. + """ + + +class LinuxPolicy(Policy): """ - Bind this process to a randomly selected CPU. If done prior to starting - threads, all threads will be bound to the same CPU. This call is a no-op on - systems other than Linux. - - A hook is installed that causes `reset_affinity(clear=True)` to run in the - child of any process created with :func:`mitogen.parent.detach_popen`, - ensuring CPU-intensive children like SSH are not forced to share the same - core as the (otherwise potentially very busy) parent. - - Threads bound to the same CPU share cache and experience the lowest - possible inter-thread roundtrip latency, for example ensuring the minimum - possible time required for :class:`mitogen.service.Pool` to interact with - :class:`mitogen.core.Broker`, as required for every message transmitted or - received. - - Binding threads of a Python process to one CPU makes sense, as they are - otherwise unable to operate in parallel, and all must acquire the same lock - prior to executing. + :class:`Policy` for Linux machines. The scheme here was tested on an + otherwise idle 16 thread machine. + + - The connection multiplexer is pinned to CPU 0. + - The Ansible top-level (strategy) is pinned to CPU 1. + - WorkerProcesses are pinned sequentually to 2..N, wrapping around when no + more CPUs exist. + - Children such as SSH may be scheduled on any CPU except 0/1. + + This could at least be improved by having workers pinned to independent + cores, before reusing the second hyperthread of an existing core. + + A hook is installed that causes :meth:`reset` to run in the child of any + process created with :func:`mitogen.parent.detach_popen`, ensuring + CPU-intensive children like SSH are not forced to share the same core as + the (otherwise potentially very busy) parent. """ def __init__(self): self.mem = mmap.mmap(-1, 4096) @@ -99,26 +182,14 @@ class Manager(object): self.state.lock.init() def _set_affinity(self, mask): - mitogen.parent._preexec_hook = self.clear + mitogen.parent._preexec_hook = self._clear s = struct.pack('L', mask) _sched_setaffinity(os.getpid(), len(s), s) - def cpu_count(self): + def _cpu_count(self): return multiprocessing.cpu_count() - def clear(self): - """ - Clear any prior binding, except for reserved CPUs. - """ - self._set_affinity(0xffffffff & ~3) - - def set_cpu(self, cpu): - """ - Bind to 0-based `cpu`. - """ - self._set_affinity(1 << cpu) - - def assign(self): + def _balance(self): self.state.lock.acquire() try: n = self.state.counter @@ -126,7 +197,28 @@ class Manager(object): finally: self.state.lock.release() - self.set_cpu(2 + (n % max(1, (self.cpu_count() - 2)))) + self._set_cpu(2 + (n % max(1, (self._cpu_count() - 2)))) + + def _set_cpu(self, cpu): + self._set_affinity(1 << cpu) + + def _clear(self): + self._set_affinity(0xffffffff & ~3) + + def assign_controller(self): + self._set_cpu(1) + + def assign_muxprocess(self): + self._set_cpu(0) + + def assign_worker(self): + self._balance() + + def assign_subprocess(self): + self._clear() -manager = Manager() +if _sched_setaffinity is not None: + policy = LinuxPolicy() +else: + policy = Policy() diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 2049d9b6..902829f7 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -173,12 +173,12 @@ class MuxProcess(object): if _init_logging: ansible_mitogen.logging.setup() if cls.child_pid: - ansible_mitogen.affinity.manager.set_cpu(1) + ansible_mitogen.affinity.policy.assign_controller() cls.child_sock.close() cls.child_sock = None mitogen.core.io_op(cls.worker_sock.recv, 1) else: - ansible_mitogen.affinity.manager.set_cpu(0) + ansible_mitogen.affinity.policy.assign_muxprocess() cls.worker_sock.close() cls.worker_sock = None self = cls() diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 821d63d2..4d1636e2 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -28,6 +28,7 @@ from __future__ import absolute_import import os +import signal import threading import mitogen.core @@ -102,11 +103,12 @@ def wrap_worker__run(*args, **kwargs): While the strategy is active, rewrite connection_loader.get() calls for some transports into requests for a compatible Mitogen transport. """ + # Ignore parent's attempts to murder us when we still need to write + # profiling output. if mitogen.core._profile_hook.__name__ != '_profile_hook': - import signal signal.signal(signal.SIGTERM, signal.SIG_IGN) - ansible_mitogen.affinity.manager.assign() + ansible_mitogen.affinity.policy.assign_worker() return mitogen.core._profile_hook('WorkerProcess', lambda: worker__run(*args, **kwargs) ) diff --git a/docs/changelog.rst b/docs/changelog.rst index 424585cf..890477e0 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -132,7 +132,8 @@ Mitogen for Ansible ~~~~~~~~~~~~~~~~~~~ This release includes a huge variety of important fixes and new optimizations. -On a synthetic run with 64 hosts it is over 35% faster than v0.2.3. +It is 35% faster than 0.2.3 on a synthetic 64 target run that places heavy load +on the connection multiplexer. Enhancements ^^^^^^^^^^^^ @@ -163,10 +164,10 @@ Enhancements plug-in path. See :ref:`mitogen-get-stack` for more information. * `152effc2 `_, - `bd4b04ae `_: multiplexer - threads are pinned to one CPU, reducing latency and SMP overhead on a hot - path exercised for every task. This yielded a 19% speedup in a 64-target job - composed of many short tasks, and should easily be visible as a runtime + `bd4b04ae `_: a CPU affinity + policy was added for Linux controllers, reducing latency and SMP overhead on + hot paths exercised for every task. This yielded a 19% speedup in a 64-target + job composed of many short tasks, and should easily be visible as a runtime improvement in many-host runs. * `0979422a `_: an expensive