|
|
|
@ -26,6 +26,53 @@
|
|
|
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
|
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
As Mitogen separates asynchronous IO out to a broker thread, communication
|
|
|
|
|
necessarily involves context switching and waking that thread. When application
|
|
|
|
|
threads and the broker share a CPU, this can be almost invisibly fast - around
|
|
|
|
|
25 microseconds for a full A->B->A round-trip.
|
|
|
|
|
|
|
|
|
|
However when threads are scheduled on different CPUs, round-trip delays
|
|
|
|
|
regularly vary wildly, and easily into milliseconds. Many contributing factors
|
|
|
|
|
exist, not least scenarios like:
|
|
|
|
|
|
|
|
|
|
1. A is preempted immediately after waking B, but before releasing the GIL.
|
|
|
|
|
2. B wakes from IO wait only to immediately enter futex wait.
|
|
|
|
|
3. A may wait 10ms or more for another timeslice, as the scheduler on its CPU
|
|
|
|
|
runs threads unrelated to its transaction (i.e. not B), wake only to release
|
|
|
|
|
its GIL, before entering IO sleep waiting for a reply from B, which cannot
|
|
|
|
|
exist yet.
|
|
|
|
|
4. B wakes, acquires GIL, performs work, and sends reply to A, causing it to
|
|
|
|
|
wake. B is preempted before releasing GIL.
|
|
|
|
|
5. A wakes from IO wait only to immediately enter futex wait.
|
|
|
|
|
6. B may wait 10ms or more for another timeslice, wake only to release its GIL,
|
|
|
|
|
before sleeping again.
|
|
|
|
|
7. A wakes, acquires GIL, finally receives reply.
|
|
|
|
|
|
|
|
|
|
Per above if we are unlucky, on an even moderately busy machine it is possible
|
|
|
|
|
to lose milliseconds just in scheduling delay, and the effect is compounded
|
|
|
|
|
when pairs of threads in process A are communicating with pairs of threads in
|
|
|
|
|
process B using the same scheme, such as when Ansible WorkerProcess is
|
|
|
|
|
communicating with ContextService in the connection multiplexer. In the worst
|
|
|
|
|
case it could involve 4 threads working in lockstep spread across 4 busy CPUs.
|
|
|
|
|
|
|
|
|
|
Since multithreading in Python is essentially useless except for waiting on IO
|
|
|
|
|
due to the presence of the GIL, at least in Ansible there is no good reason for
|
|
|
|
|
threads in the same process to run on distinct CPUs - they always operate in
|
|
|
|
|
lockstep due to the GIL, and are thus vulnerable to issues like above.
|
|
|
|
|
|
|
|
|
|
Linux lacks any natural API to describe what we want, it only permits
|
|
|
|
|
individual threads to be constrained to run on specific CPUs, and for that
|
|
|
|
|
constraint to be inherited by new threads and forks of the constrained thread.
|
|
|
|
|
|
|
|
|
|
This module therefore implements a CPU pinning policy for Ansible processes,
|
|
|
|
|
providing methods that should be called early in any new process, either to
|
|
|
|
|
rebalance which CPU it is pinned to, or in the case of subprocesses, to remove
|
|
|
|
|
the pinning entirely. It is likely to require ongoing tweaking, since pinning
|
|
|
|
|
necessarily involves preventing the scheduler from making load balancing
|
|
|
|
|
decisions.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import ctypes
|
|
|
|
|
import mmap
|
|
|
|
|
import multiprocessing
|
|
|
|
@ -45,9 +92,17 @@ try:
|
|
|
|
|
_sched_setaffinity = _libc.sched_setaffinity
|
|
|
|
|
except (OSError, AttributeError):
|
|
|
|
|
_libc = None
|
|
|
|
|
_strerror = None
|
|
|
|
|
_pthread_mutex_init = None
|
|
|
|
|
_pthread_mutex_lock = None
|
|
|
|
|
_pthread_mutex_unlock = None
|
|
|
|
|
_sched_setaffinity = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class pthread_mutex_t(ctypes.Structure):
|
|
|
|
|
"""
|
|
|
|
|
Wrap pthread_mutex_t to allow storing a lock in shared memory.
|
|
|
|
|
"""
|
|
|
|
|
_fields_ = [
|
|
|
|
|
('data', ctypes.c_uint8 * 512),
|
|
|
|
|
]
|
|
|
|
@ -66,32 +121,60 @@ class pthread_mutex_t(ctypes.Structure):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class State(ctypes.Structure):
|
|
|
|
|
"""
|
|
|
|
|
Contents of shared memory segment. This allows :meth:`Manager.assign` to be
|
|
|
|
|
called from any child, since affinity assignment must happen from within
|
|
|
|
|
the context of the new child process.
|
|
|
|
|
"""
|
|
|
|
|
_fields_ = [
|
|
|
|
|
('lock', pthread_mutex_t),
|
|
|
|
|
('counter', ctypes.c_uint8),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Manager(object):
|
|
|
|
|
class Policy(object):
|
|
|
|
|
"""
|
|
|
|
|
Process affinity policy.
|
|
|
|
|
"""
|
|
|
|
|
def assign_controller(self):
|
|
|
|
|
"""
|
|
|
|
|
Assign the Ansible top-level policy to this process.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def assign_muxprocess(self):
|
|
|
|
|
"""
|
|
|
|
|
Assign the MuxProcess policy to this process.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def assign_worker(self):
|
|
|
|
|
"""
|
|
|
|
|
Assign the WorkerProcess policy to this process.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def assign_subprocess(self):
|
|
|
|
|
"""
|
|
|
|
|
Assign the helper subprocess policy to this process.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LinuxPolicy(Policy):
|
|
|
|
|
"""
|
|
|
|
|
Bind this process to a randomly selected CPU. If done prior to starting
|
|
|
|
|
threads, all threads will be bound to the same CPU. This call is a no-op on
|
|
|
|
|
systems other than Linux.
|
|
|
|
|
|
|
|
|
|
A hook is installed that causes `reset_affinity(clear=True)` to run in the
|
|
|
|
|
child of any process created with :func:`mitogen.parent.detach_popen`,
|
|
|
|
|
ensuring CPU-intensive children like SSH are not forced to share the same
|
|
|
|
|
core as the (otherwise potentially very busy) parent.
|
|
|
|
|
|
|
|
|
|
Threads bound to the same CPU share cache and experience the lowest
|
|
|
|
|
possible inter-thread roundtrip latency, for example ensuring the minimum
|
|
|
|
|
possible time required for :class:`mitogen.service.Pool` to interact with
|
|
|
|
|
:class:`mitogen.core.Broker`, as required for every message transmitted or
|
|
|
|
|
received.
|
|
|
|
|
|
|
|
|
|
Binding threads of a Python process to one CPU makes sense, as they are
|
|
|
|
|
otherwise unable to operate in parallel, and all must acquire the same lock
|
|
|
|
|
prior to executing.
|
|
|
|
|
:class:`Policy` for Linux machines. The scheme here was tested on an
|
|
|
|
|
otherwise idle 16 thread machine.
|
|
|
|
|
|
|
|
|
|
- The connection multiplexer is pinned to CPU 0.
|
|
|
|
|
- The Ansible top-level (strategy) is pinned to CPU 1.
|
|
|
|
|
- WorkerProcesses are pinned sequentually to 2..N, wrapping around when no
|
|
|
|
|
more CPUs exist.
|
|
|
|
|
- Children such as SSH may be scheduled on any CPU except 0/1.
|
|
|
|
|
|
|
|
|
|
This could at least be improved by having workers pinned to independent
|
|
|
|
|
cores, before reusing the second hyperthread of an existing core.
|
|
|
|
|
|
|
|
|
|
A hook is installed that causes :meth:`reset` to run in the child of any
|
|
|
|
|
process created with :func:`mitogen.parent.detach_popen`, ensuring
|
|
|
|
|
CPU-intensive children like SSH are not forced to share the same core as
|
|
|
|
|
the (otherwise potentially very busy) parent.
|
|
|
|
|
"""
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.mem = mmap.mmap(-1, 4096)
|
|
|
|
@ -99,26 +182,14 @@ class Manager(object):
|
|
|
|
|
self.state.lock.init()
|
|
|
|
|
|
|
|
|
|
def _set_affinity(self, mask):
|
|
|
|
|
mitogen.parent._preexec_hook = self.clear
|
|
|
|
|
mitogen.parent._preexec_hook = self._clear
|
|
|
|
|
s = struct.pack('L', mask)
|
|
|
|
|
_sched_setaffinity(os.getpid(), len(s), s)
|
|
|
|
|
|
|
|
|
|
def cpu_count(self):
|
|
|
|
|
def _cpu_count(self):
|
|
|
|
|
return multiprocessing.cpu_count()
|
|
|
|
|
|
|
|
|
|
def clear(self):
|
|
|
|
|
"""
|
|
|
|
|
Clear any prior binding, except for reserved CPUs.
|
|
|
|
|
"""
|
|
|
|
|
self._set_affinity(0xffffffff & ~3)
|
|
|
|
|
|
|
|
|
|
def set_cpu(self, cpu):
|
|
|
|
|
"""
|
|
|
|
|
Bind to 0-based `cpu`.
|
|
|
|
|
"""
|
|
|
|
|
self._set_affinity(1 << cpu)
|
|
|
|
|
|
|
|
|
|
def assign(self):
|
|
|
|
|
def _balance(self):
|
|
|
|
|
self.state.lock.acquire()
|
|
|
|
|
try:
|
|
|
|
|
n = self.state.counter
|
|
|
|
@ -126,7 +197,28 @@ class Manager(object):
|
|
|
|
|
finally:
|
|
|
|
|
self.state.lock.release()
|
|
|
|
|
|
|
|
|
|
self.set_cpu(2 + (n % max(1, (self.cpu_count() - 2))))
|
|
|
|
|
self._set_cpu(2 + (n % max(1, (self._cpu_count() - 2))))
|
|
|
|
|
|
|
|
|
|
def _set_cpu(self, cpu):
|
|
|
|
|
self._set_affinity(1 << cpu)
|
|
|
|
|
|
|
|
|
|
def _clear(self):
|
|
|
|
|
self._set_affinity(0xffffffff & ~3)
|
|
|
|
|
|
|
|
|
|
def assign_controller(self):
|
|
|
|
|
self._set_cpu(1)
|
|
|
|
|
|
|
|
|
|
def assign_muxprocess(self):
|
|
|
|
|
self._set_cpu(0)
|
|
|
|
|
|
|
|
|
|
def assign_worker(self):
|
|
|
|
|
self._balance()
|
|
|
|
|
|
|
|
|
|
def assign_subprocess(self):
|
|
|
|
|
self._clear()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
manager = Manager()
|
|
|
|
|
if _sched_setaffinity is not None:
|
|
|
|
|
policy = LinuxPolicy()
|
|
|
|
|
else:
|
|
|
|
|
policy = Policy()
|
|
|
|
|