|
|
|
@ -172,35 +172,50 @@ class Message(object):
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Channel(object):
|
|
|
|
|
def __init__(self, router, dst_id=None, handle=None):
|
|
|
|
|
self._router = router
|
|
|
|
|
self._queue = Queue.Queue()
|
|
|
|
|
self._dst_id = dst_id
|
|
|
|
|
self.handle = handle # Avoid __repr__ crash in add_handler()
|
|
|
|
|
self.handle = router.add_handler(self._on_receive, handle)
|
|
|
|
|
class Sender(object):
|
|
|
|
|
def __init__(self, context, dst_handle):
|
|
|
|
|
self.context = context
|
|
|
|
|
self.dst_handle = dst_handle
|
|
|
|
|
|
|
|
|
|
def _on_receive(self, msg):
|
|
|
|
|
"""Callback from the Stream; appends data to the internal queue."""
|
|
|
|
|
IOLOG.debug('%r._on_receive(%r)', self, msg)
|
|
|
|
|
self._queue.put(msg)
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Sender(%r, %r)' % (self.context, self.dst_handle)
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
"""Indicate this channel is closed to the remote side."""
|
|
|
|
|
IOLOG.debug('%r.close()', self)
|
|
|
|
|
self._context.send(self.handle, _DEAD)
|
|
|
|
|
self.context.send(
|
|
|
|
|
Message.pickled(
|
|
|
|
|
_DEAD,
|
|
|
|
|
handle=self.dst_handle
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def put(self, data):
|
|
|
|
|
"""Send `data` to the remote."""
|
|
|
|
|
IOLOG.debug('%r.send(%r)', self, data)
|
|
|
|
|
self._router.send(
|
|
|
|
|
self.context.send(
|
|
|
|
|
Message.pickled(
|
|
|
|
|
data,
|
|
|
|
|
dst_id=self._dst_id,
|
|
|
|
|
handle=self.handle
|
|
|
|
|
handle=self.dst_handle
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Receiver(object):
|
|
|
|
|
def __init__(self, router, handle=None):
|
|
|
|
|
self.router = router
|
|
|
|
|
self.handle = handle # Avoid __repr__ crash in add_handler()
|
|
|
|
|
self.handle = router.add_handler(self._on_receive, handle)
|
|
|
|
|
self._queue = Queue.Queue()
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Receiver(%r, %r)' % (self.router, self.handle)
|
|
|
|
|
|
|
|
|
|
def _on_receive(self, msg):
|
|
|
|
|
"""Callback from the Stream; appends data to the internal queue."""
|
|
|
|
|
IOLOG.debug('%r._on_receive(%r)', self, msg)
|
|
|
|
|
self._queue.put(msg)
|
|
|
|
|
|
|
|
|
|
def get(self, timeout=None):
|
|
|
|
|
"""Receive an object, or ``None`` if `timeout` is reached."""
|
|
|
|
|
IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout)
|
|
|
|
@ -239,8 +254,17 @@ class Channel(object):
|
|
|
|
|
except ChannelError:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Channel(Sender, Receiver):
|
|
|
|
|
def __init__(self, router, context, dst_id, handle=None):
|
|
|
|
|
Sender.__init_(self, context, dst_id)
|
|
|
|
|
Receiver.__init__(self, router, handle)
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Channel(%r, %r)' % (self._router, self.handle)
|
|
|
|
|
return 'Channel(%s, %s)' % (
|
|
|
|
|
Sender.__repr__(self),
|
|
|
|
|
Receiver.__repr__(self)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Importer(object):
|
|
|
|
@ -992,7 +1016,7 @@ class ExternalContext(object):
|
|
|
|
|
else:
|
|
|
|
|
self.parent = Context(self.router, parent_id, 'parent')
|
|
|
|
|
|
|
|
|
|
self.channel = Channel(self.router, handle=CALL_FUNCTION)
|
|
|
|
|
self.channel = Receiver(self.router, CALL_FUNCTION)
|
|
|
|
|
self.stream = Stream(self.router, parent_id, key)
|
|
|
|
|
self.stream.name = 'parent'
|
|
|
|
|
self.stream.accept(in_fd, out_fd)
|
|
|
|
|