diff --git a/econtext/core.py b/econtext/core.py index 6026e424..b6202d38 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -176,11 +176,11 @@ class Message(object): class Channel(object): - def __init__(self, context, handle): + def __init__(self, context, handle=None): self._context = context - self._handle = handle self._queue = Queue.Queue() - self._context.add_handler(self._receive, handle) + self.handle = handle # Avoid __repr__ crash in add_handler() + self.handle = context.add_handler(self._receive, handle) def _receive(self, msg): """Callback from the Stream; appends data to the internal queue.""" @@ -190,19 +190,27 @@ class Channel(object): def close(self): """Indicate this channel is closed to the remote side.""" IOLOG.debug('%r.close()', self) - self._context.send(self._handle, _DEAD) + self._context.send(self.handle, _DEAD) def put(self, data): """Send `data` to the remote.""" IOLOG.debug('%r.send(%r)', self, data) - self._context.send(self._handle, data) + self._context.send(self.handle, data) def get(self, timeout=None): """Receive an object, or ``None`` if `timeout` is reached.""" IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout) - try: - msg = self._queue.get(True, timeout) - except Queue.Empty: + if timeout: + timeout += time.time() + + msg = None + while msg is None and (timeout is None or timeout < time.time()): + try: + msg = self._queue.get(True, 0.5) + except Queue.Empty: + continue + + if msg is None: return IOLOG.debug('%r.on_receive() got %r', self, msg) @@ -228,7 +236,7 @@ class Channel(object): return def __repr__(self): - return 'Channel(%r, %r)' % (self._context, self._handle) + return 'Channel(%r, %r)' % (self._context, self.handle) class Importer(object):