diff --git a/mitogen/core.py b/mitogen/core.py index 4d8ece60..b6936a40 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1199,24 +1199,41 @@ class Latch(object): See :ref:`waking-sleeping-threads` for further discussion. """ poller_class = Poller - closed = False - _waking = 0 - _sockets = [] - _allsockets = [] + + # The _cls_ prefixes here are to make it crystal clear in the code which + # state mutation isn't covered by :attr:`_lock`. + + #: List of reusable :func:`socket.socketpair` tuples. The list is from + #: multiple threads, the only safe operations are `append()` and `pop()`. + _cls_idle_socketpairs = [] + + #: List of every socket object that must be closed by :meth:`_on_fork`. + #: Inherited descriptors cannot be reused, as the duplicated handles + #: reference the same underlying kernel-side sockets still in use by + #: the parent process. + _cls_all_sockets = [] def __init__(self): + self.closed = False self._lock = threading.Lock() + #: List of unconsumed enqueued items. self._queue = [] + #: List of `(wsock, cookie)` awaiting an element, where `wsock` is the + #: socketpair's write side, and `cookie` is the string to write. self._sleeping = [] + #: Number of elements of :attr:`_sleeping` that have already been + #: woken, and have a corresponding element index from :attr:`_queue` + #: assigned to them. + self._waking = 0 @classmethod def _on_fork(cls): """ Clean up any files belonging to the parent process after a fork. """ - cls._sockets = [] - while cls._allsockets: - cls._allsockets.pop().close() + cls._cls_idle_socketpairs = [] + while cls._cls_all_sockets: + cls._cls_all_sockets.pop().close() def close(self): """ @@ -1227,7 +1244,8 @@ class Latch(object): try: self.closed = True while self._waking < len(self._sleeping): - self._wake(self._sleeping[self._waking]) + wsock, cookie = self._sleeping[self._waking] + self._wake(wsock, cookie) self._waking += 1 finally: self._lock.release() @@ -1252,16 +1270,26 @@ class Latch(object): """ Return an unused socketpair, creating one if none exist. """ - # pop() must be atomic, which is true for GIL-equipped interpreters. try: - return self._sockets.pop() + return self._cls_idle_socketpairs.pop() # pop() must be atomic except IndexError: rsock, wsock = socket.socketpair() set_cloexec(rsock.fileno()) set_cloexec(wsock.fileno()) - self._allsockets.extend((rsock, wsock)) + self._cls_all_sockets.extend((rsock, wsock)) return rsock, wsock + COOKIE_SIZE = 33 + + def _make_cookie(self): + """ + Return a 33-byte string encoding the ID of the instance and the current + thread. This disambiguates legitimate wake-ups, accidental writes to + the FD, and buggy internal FD sharing. + """ + ident = threading.currentThread().ident + return b(u'%016x-%016x' % (int(id(self)), ident)) + def get(self, timeout=None, block=True): """ Return the next enqueued object, or sleep waiting for one. @@ -1295,25 +1323,28 @@ class Latch(object): if not block: raise TimeoutError() rsock, wsock = self._get_socketpair() - self._sleeping.append(wsock) + cookie = self._make_cookie() + self._sleeping.append((wsock, cookie)) finally: self._lock.release() poller = self.poller_class() poller.start_receive(rsock.fileno()) try: - return self._get_sleep(poller, timeout, block, rsock, wsock) + return self._get_sleep(poller, timeout, block, rsock, wsock, cookie) finally: poller.close() - def _get_sleep(self, poller, timeout, block, rsock, wsock): + def _get_sleep(self, poller, timeout, block, rsock, wsock, cookie): """ When a result is not immediately available, sleep waiting for :meth:`put` to write a byte to our socket pair. """ - _vv and IOLOG.debug('%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)', - self, timeout, block, rsock.fileno(), - wsock.fileno()) + _vv and IOLOG.debug( + '%r._get_sleep(timeout=%r, block=%r, rfd=%d, wfd=%d)', + self, timeout, block, rsock.fileno(), wsock.fileno() + ) + e = None woken = None try: @@ -1323,20 +1354,22 @@ class Latch(object): self._lock.acquire() try: - i = self._sleeping.index(wsock) + i = self._sleeping.index((wsock, cookie)) del self._sleeping[i] - self._sockets.append((rsock, wsock)) - if i >= self._waking: - recv = rsock.recv(10) if woken else None - s = '%r: woken=%r, recv=%r' % (self, woken, recv) - raise e or TimeoutError(s) + if not woken: + raise e or TimeoutError() + + got_cookie = rsock.recv(self.COOKIE_SIZE) + self._cls_idle_socketpairs.append((rsock, wsock)) + + assert cookie == got_cookie, ( + "Cookie incorrect; got %r, expected %r" \ + % (got_cookie, cookie) + ) + assert i < self._waking, ( + "Cookie correct, but no queue element assigned." + ) self._waking -= 1 - byte = rsock.recv(10) - if byte != b('\x7f'): - raise LatchError('internal error: received >1 wakeups: %r' - % (byte,)) - if e: - raise e if self.closed: raise LatchError() _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[i]) @@ -1360,17 +1393,17 @@ class Latch(object): self._queue.append(obj) if self._waking < len(self._sleeping): - sock = self._sleeping[self._waking] + wsock, cookie = self._sleeping[self._waking] self._waking += 1 _vv and IOLOG.debug('%r.put() -> waking wfd=%r', self, sock.fileno()) - self._wake(sock) + self._wake(wsock, cookie) finally: self._lock.release() - def _wake(self, sock): + def _wake(self, wsock, cookie): try: - os.write(sock.fileno(), b('\x7f')) + os.write(wsock.fileno(), cookie) except OSError: e = sys.exc_info()[1] if e.args[0] != errno.EBADF: