master: use queue-like Latch in Select() too.

pull/87/head
David Wilson 7 years ago
parent 371a259a5e
commit 0d57afc914

@ -78,7 +78,7 @@ else:
class Error(Exception): class Error(Exception):
def __init__(self, fmt, *args): def __init__(self, fmt=None, *args):
if args: if args:
fmt %= args fmt %= args
Exception.__init__(self, fmt) Exception.__init__(self, fmt)
@ -113,7 +113,7 @@ class StreamError(Error):
pass pass
class TimeoutError(StreamError): class TimeoutError(Error):
pass pass
@ -805,24 +805,25 @@ class Latch(object):
if self.queue: if self.queue:
return self.queue.pop(0) return self.queue.pop(0)
if not block: if not block:
return raise TimeoutError()
self._tls_init() self._tls_init()
self.wake_socks.append(_tls.wsock) self.wake_socks.append(_tls.wsock)
finally: finally:
self.lock.release() self.lock.release()
rfds, _, _ = select.select([_tls.rsock], [], [], timeout) rfds, _, _ = select.select([_tls.rsock], [], [], timeout)
assert len(rfds) or timeout is None assert len(rfds) or timeout is not None
self.lock.acquire() self.lock.acquire()
try: try:
if _tls.wsock in self.wake_socks: if _tls.wsock in self.wake_socks:
# Nothing woke us, remove stale entry. # Nothing woke us, remove stale entry.
self.wake_socks.remove(_tls.wsock) self.wake_socks.remove(_tls.wsock)
return raise TimeoutError()
if _tls.rsock in rfds:
_tls.rsock.recv(1) assert _tls.rsock in rfds
return self.queue.pop(0) _tls.rsock.recv(1)
return self.queue.pop(0)
finally: finally:
self.lock.release() self.lock.release()

@ -126,14 +126,12 @@ class Select(object):
def __init__(self, receivers=(), oneshot=True): def __init__(self, receivers=(), oneshot=True):
self._receivers = [] self._receivers = []
self._oneshot = oneshot self._oneshot = oneshot
self._queue = Queue.Queue()
self._latch = mitogen.core.Latch() self._latch = mitogen.core.Latch()
for recv in receivers: for recv in receivers:
self.add(recv) self.add(recv)
def _put(self, value): def _put(self, value):
self._queue.put(value) self._latch.put(value)
self._latch.wake()
if self.notify: if self.notify:
self.notify(self) self.notify(self)
@ -193,7 +191,7 @@ class Select(object):
self.remove(recv) self.remove(recv)
def empty(self): def empty(self):
return self._queue.empty() return self._latch.empty()
empty_msg = 'Cannot get(), Select instance is empty' empty_msg = 'Cannot get(), Select instance is empty'
@ -202,8 +200,7 @@ class Select(object):
raise SelectError(self.empty_msg) raise SelectError(self.empty_msg)
while True: while True:
self._latch.wait() recv = self._latch.get(timeout=timeout)
recv = self._queue.get()
try: try:
msg = recv.get(block=False) msg = recv.get(block=False)
if self._oneshot: if self._oneshot:

Loading…
Cancel
Save