From 6905dc4e8d61f5753348f15d7636a70b68755c64 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 15 Feb 2018 05:06:43 +0545 Subject: [PATCH] master: use queue-like Latch in Select() too. --- mitogen/core.py | 17 +++++++++-------- mitogen/master.py | 9 +++------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/mitogen/core.py b/mitogen/core.py index 964bcd25..2df898ef 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -78,7 +78,7 @@ else: class Error(Exception): - def __init__(self, fmt, *args): + def __init__(self, fmt=None, *args): if args: fmt %= args Exception.__init__(self, fmt) @@ -113,7 +113,7 @@ class StreamError(Error): pass -class TimeoutError(StreamError): +class TimeoutError(Error): pass @@ -805,24 +805,25 @@ class Latch(object): if self.queue: return self.queue.pop(0) if not block: - return + raise TimeoutError() self._tls_init() self.wake_socks.append(_tls.wsock) finally: self.lock.release() rfds, _, _ = select.select([_tls.rsock], [], [], timeout) - assert len(rfds) or timeout is None + assert len(rfds) or timeout is not None self.lock.acquire() try: if _tls.wsock in self.wake_socks: # Nothing woke us, remove stale entry. self.wake_socks.remove(_tls.wsock) - return - if _tls.rsock in rfds: - _tls.rsock.recv(1) - return self.queue.pop(0) + raise TimeoutError() + + assert _tls.rsock in rfds + _tls.rsock.recv(1) + return self.queue.pop(0) finally: self.lock.release() diff --git a/mitogen/master.py b/mitogen/master.py index 7905ed7d..bbec3b38 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -126,14 +126,12 @@ class Select(object): def __init__(self, receivers=(), oneshot=True): self._receivers = [] self._oneshot = oneshot - self._queue = Queue.Queue() self._latch = mitogen.core.Latch() for recv in receivers: self.add(recv) def _put(self, value): - self._queue.put(value) - self._latch.wake() + self._latch.put(value) if self.notify: self.notify(self) @@ -193,7 +191,7 @@ class Select(object): self.remove(recv) def empty(self): - return self._queue.empty() + return self._latch.empty() empty_msg = 'Cannot get(), Select instance is empty' @@ -202,8 +200,7 @@ class Select(object): raise SelectError(self.empty_msg) while True: - self._latch.wait() - recv = self._queue.get() + recv = self._latch.get(timeout=timeout) try: msg = recv.get(block=False) if self._oneshot: