issue #605: ansible: share a sem_t instead of a pthread_mutex_t

The previous version quite reliably causes worker deadlocks within 10
minutes running:

    # 100 times:
    - import_playbook: integration/async/runner_one_job.yml
    # 100 times:
    - import_playbook: integration/module_utils/adjacent_to_playbook.yml

via .ci/soak/mitogen.sh with PLAYBOOK= set to the above playbook.

Attaching to the worker with gdb reveals it in an instruction
immediately following a futex() call, which likely returned EINTR due to
attaching gdb. Examining the pthread_mutex_t state reveals it to be
completely unlocked.

pthread_mutex_t on Linux should have zero trouble living in shmem, so
it's not clear how this deadlock is happening. Meanwhile POSIX
semaphores are explicitly designed for cross-process use and have a
completely different internal implementation, so try those instead. 1
hour of soaking reveals no deadlock.

This is about avoiding managing a lockable temporary file on disk to
contain our counter, and somehow communicating a reference to it into
subprocesses (despite the subprocess module closing inherited fds, etc),
somehow deleting it reliably at exit, and somehow avoiding concurrent
Ansible runs stepping on the same file. For now ctypes is still less
pain.

A final possibility would be to abandon a shared counter and instead
pick a CPU based on the hash of e.g. the new child's process ID. That
would likely balance equally well, and might be worth exploring when
making this code work on BSD.
pull/618/head
David Wilson 5 years ago
parent 4fa760cd21
commit f78a5f08c6

@ -92,37 +92,37 @@ try:
_libc = ctypes.CDLL(None, use_errno=True) _libc = ctypes.CDLL(None, use_errno=True)
_strerror = _libc.strerror _strerror = _libc.strerror
_strerror.restype = ctypes.c_char_p _strerror.restype = ctypes.c_char_p
_pthread_mutex_init = _libc.pthread_mutex_init _sem_init = _libc.sem_init
_pthread_mutex_lock = _libc.pthread_mutex_lock _sem_wait = _libc.sem_wait
_pthread_mutex_unlock = _libc.pthread_mutex_unlock _sem_post = _libc.sem_post
_sched_setaffinity = _libc.sched_setaffinity _sched_setaffinity = _libc.sched_setaffinity
except (OSError, AttributeError): except (OSError, AttributeError):
_libc = None _libc = None
_strerror = None _strerror = None
_pthread_mutex_init = None _sem_init = None
_pthread_mutex_lock = None _sem_wait = None
_pthread_mutex_unlock = None _sem_post = None
_sched_setaffinity = None _sched_setaffinity = None
class pthread_mutex_t(ctypes.Structure): class sem_t(ctypes.Structure):
""" """
Wrap pthread_mutex_t to allow storing a lock in shared memory. Wrap sem_t to allow storing a lock in shared memory.
""" """
_fields_ = [ _fields_ = [
('data', ctypes.c_uint8 * 512), ('data', ctypes.c_uint8 * 128),
] ]
def init(self): def init(self):
if _pthread_mutex_init(self.data, 0): if _sem_init(self.data, 1, 1):
raise Exception(_strerror(ctypes.get_errno())) raise Exception(_strerror(ctypes.get_errno()))
def acquire(self): def acquire(self):
if _pthread_mutex_lock(self.data): if _sem_wait(self.data):
raise Exception(_strerror(ctypes.get_errno())) raise Exception(_strerror(ctypes.get_errno()))
def release(self): def release(self):
if _pthread_mutex_unlock(self.data): if _sem_post(self.data):
raise Exception(_strerror(ctypes.get_errno())) raise Exception(_strerror(ctypes.get_errno()))
@ -133,7 +133,7 @@ class State(ctypes.Structure):
the context of the new child process. the context of the new child process.
""" """
_fields_ = [ _fields_ = [
('lock', pthread_mutex_t), ('lock', sem_t),
('counter', ctypes.c_uint8), ('counter', ctypes.c_uint8),
] ]

Loading…
Cancel
Save