From 001e0163fe544eb1a3b2803740b2b4e18df91fd9 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 20 Mar 2018 12:22:25 +0545 Subject: [PATCH] issue #156: handle multiple _put() before wake of first sleeper - If latch.get() is called and the queue is empty, a thread is put to sleep. - If Latch.put() from another thread then appends an item to the queue and wakes the sleeping thread, and - If a subsequent Latch.put() from the same or another thread manages to acquire `lock` before the sleeping thread is scheduled, - The sleeping thread's wake socket would have multiple bytes written to it. Therefore create a new _pending variable to track the only item assigned to each thread (keyed by its write socket), and remove the socket from `sleeping` from within put. --- mitogen/core.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index ad9511cc..9cd317b6 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -911,13 +911,14 @@ class Latch(object): def __init__(self): self._lock = threading.Lock() self._queue = [] - self._waiters = [] + self._sleeping = [] + self._pending = {} def close(self): self._lock.acquire() try: self.closed = True - for wsock in self._waiters: + for wsock in self._sleeping: self._wake(wsock) finally: self._lock.release() @@ -939,13 +940,13 @@ class Latch(object): try: if self.closed: raise LatchError() - if self._queue and not self._waiters: + if self._queue: _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0]) return self._queue.pop(0) if not block: raise TimeoutError() self._tls_init() - self._waiters.append(_tls.wsock) + self._sleeping.append(_tls.wsock) finally: self._lock.release() @@ -955,15 +956,16 @@ class Latch(object): self._lock.acquire() try: - self._waiters.remove(_tls.wsock) if self.closed: raise LatchError() if not rfds: raise TimeoutError() - assert _tls.rsock.recv(1) == '\x7f' + if _tls.rsock.recv(2) != '\x7f': + raise LatchError('internal error: received >1 wakeups') try: - _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) - return self._queue.pop(0) + obj = self._pending.pop(_tls.wsock) + _vv and IOLOG.debug('%r.get() wake -> %r', self, obj) + return obj except IndexError: IOLOG.exception('%r.get() INDEX ERROR', self) raise @@ -976,11 +978,14 @@ class Latch(object): try: if self.closed: raise LatchError() - self._queue.append(obj) - if self._waiters: + if self._sleeping: + sock = self._sleeping.pop(0) + self._pending[sock] = obj _vv and IOLOG.debug('%r.put() -> waking wfd=%r', - self, self._waiters[0].fileno()) - self._wake(self._waiters[0]) + self, sock.fileno()) + self._wake(sock) + else: + self._queue.append(obj) finally: self._lock.release()