diff --git a/mitogen/core.py b/mitogen/core.py index ad9511cc..9cd317b6 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -911,13 +911,14 @@ class Latch(object): def __init__(self): self._lock = threading.Lock() self._queue = [] - self._waiters = [] + self._sleeping = [] + self._pending = {} def close(self): self._lock.acquire() try: self.closed = True - for wsock in self._waiters: + for wsock in self._sleeping: self._wake(wsock) finally: self._lock.release() @@ -939,13 +940,13 @@ class Latch(object): try: if self.closed: raise LatchError() - if self._queue and not self._waiters: + if self._queue: _vv and IOLOG.debug('%r.get() -> %r', self, self._queue[0]) return self._queue.pop(0) if not block: raise TimeoutError() self._tls_init() - self._waiters.append(_tls.wsock) + self._sleeping.append(_tls.wsock) finally: self._lock.release() @@ -955,15 +956,16 @@ class Latch(object): self._lock.acquire() try: - self._waiters.remove(_tls.wsock) if self.closed: raise LatchError() if not rfds: raise TimeoutError() - assert _tls.rsock.recv(1) == '\x7f' + if _tls.rsock.recv(2) != '\x7f': + raise LatchError('internal error: received >1 wakeups') try: - _vv and IOLOG.debug('%r.get() wake -> %r', self, self._queue[0]) - return self._queue.pop(0) + obj = self._pending.pop(_tls.wsock) + _vv and IOLOG.debug('%r.get() wake -> %r', self, obj) + return obj except IndexError: IOLOG.exception('%r.get() INDEX ERROR', self) raise @@ -976,11 +978,14 @@ class Latch(object): try: if self.closed: raise LatchError() - self._queue.append(obj) - if self._waiters: + if self._sleeping: + sock = self._sleeping.pop(0) + self._pending[sock] = obj _vv and IOLOG.debug('%r.put() -> waking wfd=%r', - self, self._waiters[0].fileno()) - self._wake(self._waiters[0]) + self, sock.fileno()) + self._wake(sock) + else: + self._queue.append(obj) finally: self._lock.release()