core: fix Latch socket sharing race.

If thread A is about to wake as thread B is about to sleep, and A loses
the GIL at an inopportune moment, it was possible for two latches to
share the same socketpair, causing wakeups routed to the wrong latch.

The pair was returned to the 'idle sockets' list before .recv() had been
called. This manifested as TimeoutError() thrown rarely with many active
threads and the host is heavily loaded (such as Travis CI).

Add more documentation and stop writing single wake bytes. Instead the
recipient's identity is written instead, making it simpler to detect
future bugs.
pull/295/head
David Wilson 6 years ago
parent a53f2e7137
commit 484d4fdb74

@ -1199,24 +1199,41 @@ class Latch(object):
See :ref:`waking-sleeping-threads` for further discussion.
"""
poller_class = Poller
closed = False
_waking = 0
_sockets = []
_allsockets = []
# The _cls_ prefixes here are to make it crystal clear in the code which
# state mutation isn't covered by :attr:`_lock`.
#: List of reusable :func:`socket.socketpair` tuples. The list is from
#: multiple threads, the only safe operations are `append()` and `pop()`.
_cls_idle_socketpairs = []
#: List of every socket object that must be closed by :meth:`_on_fork`.
#: Inherited descriptors cannot be reused, as the duplicated handles
#: reference the same underlying kernel-side sockets still in use by
#: the parent process.
_cls_all_sockets = []
def __init__(self):
self.closed = False
self._lock = threading.Lock()
#: List of unconsumed enqueued items.
self._queue = []
#: List of `(wsock, cookie)` awaiting an element, where `wsock` is the
#: socketpair's write side, and `cookie` is the string to write.
self._sleeping = []
#: Number of elements of :attr:`_sleeping` that have already been
#: woken, and have a corresponding element index from :attr:`_queue`
#: assigned to them.
self._waking = 0
@classmethod
def _on_fork(cls):
"""
Clean up any files belonging to the parent process after a fork.
"""
cls._sockets = []
while cls._allsockets:
cls._allsockets.pop().close()
cls._cls_idle_socketpairs = []
while cls._cls_all_sockets:
cls._cls_all_sockets.pop().close()
def close(self):
"""
@ -1227,7 +1244,8 @@ class Latch(object):
try:
self.closed = True
while self._waking < len(self._sleeping):
self._wake(self._sleeping[self._waking])
wsock, cookie = self._sleeping[self._waking]
self._wake(wsock, cookie)
self._waking += 1
finally:
self._lock.release()
@ -1252,16 +1270,26 @@ class Latch(object):
"""
Return an unused socketpair, creating one if none exist.
"""
# pop() must be atomic, which is true for GIL-equipped interpreters.
try:
return self._sockets.pop()
return self._cls_idle_socketpairs.pop() # pop() must be atomic
except IndexError:
rsock, wsock = socket.socketpair()
set_cloexec(rsock.fileno())
set_cloexec(wsock.fileno())
self._allsockets.extend((rsock, wsock))
self._cls_all_sockets.extend((rsock, wsock))
return rsock, wsock
COOKIE_SIZE = 33
def _make_cookie(self):
"""
Return a 33-byte string encoding the ID of the instance and the current
thread. This disambiguates legitimate wake-ups, accidental writes to
the FD, and buggy internal FD sharing.
"""
ident = threading.currentThread().ident
return b(u'%016x-%016x' % (int(id(self)), ident))
def get(self, timeout=None, block=True):
"""
Return the next enqueued object, or sleep waiting for one.
@ -1295,25 +1323,28 @@ class Latch(object):
if not block:
raise TimeoutError()
rsock, wsock = self._get_socketpair()
self._sleeping.append(wsock)
cookie = self._make_cookie()
self._sleeping.append((wsock, cookie))
finally:
self._lock.release()
poller = self.poller_class()
poller.start_receive(rsock.fileno())
try:
return self._get_sleep(poller, timeout, block, rsock, wsock)
return self._get_sleep(poller, timeout, block, rsock, wsock, cookie)
finally:
poller.close()
def _get_sleep(self, poller, timeout, block, rsock, wsock):
def _get_sleep(self, poller, timeout, block, rsock, wsock, cookie):
"""
When a result is not immediately available, sleep waiting for
:meth:`put` to write a byte to our socket pair.
"""
_vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)',
self, timeout, block, rsock.fileno(),
wsock.fileno())
_vv and IOLOG.debug(
'%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)',
self, timeout, block, rsock.fileno(), wsock.fileno()
)
e = None
woken = None
try:
@ -1323,20 +1354,22 @@ class Latch(object):
self._lock.acquire()
try:
i = self._sleeping.index(wsock)
i = self._sleeping.index((wsock, cookie))
del self._sleeping[i]
self._sockets.append((rsock, wsock))
if i >= self._waking:
recv = rsock.recv(10) if woken else None
s = '%r: woken=%r, recv=%r' % (self, woken, recv)
raise e or TimeoutError(s)
if not woken:
raise e or TimeoutError()
got_cookie = rsock.recv(self.COOKIE_SIZE)
self._cls_idle_socketpairs.append((rsock, wsock))
assert cookie == got_cookie, (
"Cookie incorrect; got %r, expected %r" \
% (got_cookie, cookie)
)
assert i < self._waking, (
"Cookie correct, but no queue element assigned."
)
self._waking -= 1
byte = rsock.recv(10)
if byte != b('\x7f'):
raise LatchError('internal error: received >1 wakeups: %r'
% (byte,))
if e:
raise e
if self.closed:
raise LatchError()
_vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i])
@ -1360,17 +1393,17 @@ class Latch(object):
self._queue.append(obj)
if self._waking < len(self._sleeping):
sock = self._sleeping[self._waking]
wsock, cookie = self._sleeping[self._waking]
self._waking += 1
_vv and IOLOG.debug('%r.put() -> waking wfd=%r',
self, sock.fileno())
self._wake(sock)
self._wake(wsock, cookie)
finally:
self._lock.release()
def _wake(self, sock):
def _wake(self, wsock, cookie):
try:
os.write(sock.fileno(), b('\x7f'))
os.write(wsock.fileno(), cookie)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EBADF:

Loading…
Cancel
Save