Support building Channels from existing handle, make .get() interruptible.

pull/35/head
David Wilson 7 years ago
parent 30d838edf9
commit 174b3242fc

@ -176,11 +176,11 @@ class Message(object):
class Channel(object): class Channel(object):
def __init__(self, context, handle): def __init__(self, context, handle=None):
self._context = context self._context = context
self._handle = handle
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._context.add_handler(self._receive, handle) self.handle = handle # Avoid __repr__ crash in add_handler()
self.handle = context.add_handler(self._receive, handle)
def _receive(self, msg): def _receive(self, msg):
"""Callback from the Stream; appends data to the internal queue.""" """Callback from the Stream; appends data to the internal queue."""
@ -190,19 +190,27 @@ class Channel(object):
def close(self): def close(self):
"""Indicate this channel is closed to the remote side.""" """Indicate this channel is closed to the remote side."""
IOLOG.debug('%r.close()', self) IOLOG.debug('%r.close()', self)
self._context.send(self._handle, _DEAD) self._context.send(self.handle, _DEAD)
def put(self, data): def put(self, data):
"""Send `data` to the remote.""" """Send `data` to the remote."""
IOLOG.debug('%r.send(%r)', self, data) IOLOG.debug('%r.send(%r)', self, data)
self._context.send(self._handle, data) self._context.send(self.handle, data)
def get(self, timeout=None): def get(self, timeout=None):
"""Receive an object, or ``None`` if `timeout` is reached.""" """Receive an object, or ``None`` if `timeout` is reached."""
IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout) IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout)
try: if timeout:
msg = self._queue.get(True, timeout) timeout += time.time()
except Queue.Empty:
msg = None
while msg is None and (timeout is None or timeout < time.time()):
try:
msg = self._queue.get(True, 0.5)
except Queue.Empty:
continue
if msg is None:
return return
IOLOG.debug('%r.on_receive() got %r', self, msg) IOLOG.debug('%r.on_receive() got %r', self, msg)
@ -228,7 +236,7 @@ class Channel(object):
return return
def __repr__(self): def __repr__(self):
return 'Channel(%r, %r)' % (self._context, self._handle) return 'Channel(%r, %r)' % (self._context, self.handle)
class Importer(object): class Importer(object):

Loading…
Cancel
Save