From 484d4fdb74a89c9153bd71ea03194477c14383ec Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 7 Jul 2018 07:31:07 +0000 Subject: [PATCH] core: fix Latch socket sharing race. If thread A is about to wake as thread B is about to sleep, and A loses the GIL at an inopportune moment, it was possible for two latches to share the same socketpair, causing wakeups routed to the wrong latch. The pair was returned to the 'idle sockets' list before .recv() had been called. This manifested as TimeoutError() thrown rarely with many active threads and the host is heavily loaded (such as Travis CI). Add more documentation and stop writing single wake bytes. Instead the recipient's identity is written instead, making it simpler to detect future bugs. --- mitogen/core.py | 99 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 66 insertions(+), 33 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 4d8ece60..b6936a40 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1199,24 +1199,41 @@ class Latch(object): See :ref:`waking-sleeping-threads` for further discussion. """ poller_class = Poller - closed = False - _waking = 0 - _sockets = [] - _allsockets = [] + + # The _cls_ prefixes here are to make it crystal clear in the code which + # state mutation isn't covered by :attr:`_lock`. + + #: List of reusable :func:`socket.socketpair` tuples. The list is from + #: multiple threads, the only safe operations are `append()` and `pop()`. + _cls_idle_socketpairs = [] + + #: List of every socket object that must be closed by :meth:`_on_fork`. + #: Inherited descriptors cannot be reused, as the duplicated handles + #: reference the same underlying kernel-side sockets still in use by + #: the parent process. + _cls_all_sockets = [] def __init__(self): + self.closed = False self._lock = threading.Lock() + #: List of unconsumed enqueued items. self._queue = [] + #: List of `(wsock, cookie)` awaiting an element, where `wsock` is the + #: socketpair's write side, and `cookie` is the string to write. self._sleeping = [] + #: Number of elements of :attr:`_sleeping` that have already been + #: woken, and have a corresponding element index from :attr:`_queue` + #: assigned to them. + self._waking = 0 @classmethod def _on_fork(cls): """ Clean up any files belonging to the parent process after a fork. """ - cls._sockets = [] - while cls._allsockets: - cls._allsockets.pop().close() + cls._cls_idle_socketpairs = [] + while cls._cls_all_sockets: + cls._cls_all_sockets.pop().close() def close(self): """ @@ -1227,7 +1244,8 @@ class Latch(object): try: self.closed = True while self._waking < len(self._sleeping): - self._wake(self._sleeping[self._waking]) + wsock, cookie = self._sleeping[self._waking] + self._wake(wsock, cookie) self._waking += 1 finally: self._lock.release() @@ -1252,16 +1270,26 @@ class Latch(object): """ Return an unused socketpair, creating one if none exist. """ - # pop() must be atomic, which is true for GIL-equipped interpreters. try: - return self._sockets.pop() + return self._cls_idle_socketpairs.pop() # pop() must be atomic except IndexError: rsock, wsock = socket.socketpair() set_cloexec(rsock.fileno()) set_cloexec(wsock.fileno()) - self._allsockets.extend((rsock, wsock)) + self._cls_all_sockets.extend((rsock, wsock)) return rsock, wsock + COOKIE_SIZE = 33 + + def _make_cookie(self): + """ + Return a 33-byte string encoding the ID of the instance and the current + thread. This disambiguates legitimate wake-ups, accidental writes to + the FD, and buggy internal FD sharing. + """ + ident = threading.currentThread().ident + return b(u'%016x-%016x' % (int(id(self)), ident)) + def get(self, timeout=None, block=True): """ Return the next enqueued object, or sleep waiting for one. @@ -1295,25 +1323,28 @@ class Latch(object): if not block: raise TimeoutError() rsock, wsock = self._get_socketpair() - self._sleeping.append(wsock) + cookie = self._make_cookie() + self._sleeping.append((wsock, cookie)) finally: self._lock.release() poller = self.poller_class() poller.start_receive(rsock.fileno()) try: - return self._get_sleep(poller, timeout, block, rsock, wsock) + return self._get_sleep(poller, timeout, block, rsock, wsock, cookie) finally: poller.close() - def _get_sleep(self, poller, timeout, block, rsock, wsock): + def _get_sleep(self, poller, timeout, block, rsock, wsock, cookie): """ When a result is not immediately available, sleep waiting for :meth:`put` to write a byte to our socket pair. """ - _vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)', - self, timeout, block, rsock.fileno(), - wsock.fileno()) + _vv and IOLOG.debug( + '%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)', + self, timeout, block, rsock.fileno(), wsock.fileno() + ) + e = None woken = None try: @@ -1323,20 +1354,22 @@ class Latch(object): self._lock.acquire() try: - i = self._sleeping.index(wsock) + i = self._sleeping.index((wsock, cookie)) del self._sleeping[i] - self._sockets.append((rsock, wsock)) - if i >= self._waking: - recv = rsock.recv(10) if woken else None - s = '%r: woken=%r, recv=%r' % (self, woken, recv) - raise e or TimeoutError(s) + if not woken: + raise e or TimeoutError() + + got_cookie = rsock.recv(self.COOKIE_SIZE) + self._cls_idle_socketpairs.append((rsock, wsock)) + + assert cookie == got_cookie, ( + "Cookie incorrect; got %r, expected %r" \ + % (got_cookie, cookie) + ) + assert i < self._waking, ( + "Cookie correct, but no queue element assigned." + ) self._waking -= 1 - byte = rsock.recv(10) - if byte != b('\x7f'): - raise LatchError('internal error: received >1 wakeups: %r' - % (byte,)) - if e: - raise e if self.closed: raise LatchError() _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i]) @@ -1360,17 +1393,17 @@ class Latch(object): self._queue.append(obj) if self._waking < len(self._sleeping): - sock = self._sleeping[self._waking] + wsock, cookie = self._sleeping[self._waking] self._waking += 1 _vv and IOLOG.debug('%r.put() -> waking wfd=%r', self, sock.fileno()) - self._wake(sock) + self._wake(wsock, cookie) finally: self._lock.release() - def _wake(self, sock): + def _wake(self, wsock, cookie): try: - os.write(sock.fileno(), b('\x7f')) + os.write(wsock.fileno(), cookie) except OSError: e = sys.exc_info()[1] if e.args[0] != errno.EBADF: