Teach Router.add_handler() to kill a handler on context disconnect

When a context's Stream is disconnected, now any reply_to handlers
waiting for that specific context will be cancelled, rather than hanging
until all pending handelrs are cancelled during Broker is torn down.

This is groundwork for a bunch of things, including moving connect() to
the Broker thread
pull/35/head
David Wilson 7 years ago
parent ffe86a882d
commit 299d4a2e05

@ -93,6 +93,16 @@ class Dead(object):
_DEAD = Dead() _DEAD = Dead()
def listen(obj, name, func):
signals = vars(obj).setdefault('_signals', {})
signals.setdefault(name, []).append(func)
def fire(obj, name, **kwargs):
signals = vars(obj).get('_signals', {})
return [func(**kwargs) for func in signals.get(name, ())]
def set_cloexec(fd): def set_cloexec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD) flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
@ -479,9 +489,11 @@ class BasicStream(object):
broker.stop_transmit(self) broker.stop_transmit(self)
self.receive_side.close() self.receive_side.close()
self.transmit_side.close() self.transmit_side.close()
fire(self, 'disconnect')
def on_shutdown(self, broker): def on_shutdown(self, broker):
LOG.debug('%r.on_shutdown()', self) LOG.debug('%r.on_shutdown()', self)
fire(self, 'shutdown')
self.on_disconnect(broker) self.on_disconnect(broker)
@ -615,6 +627,7 @@ class Context(object):
def on_disconnect(self, broker): def on_disconnect(self, broker):
LOG.debug('Parent stream is gone, dying.') LOG.debug('Parent stream is gone, dying.')
fire(self, 'disconnect')
broker.shutdown() broker.shutdown()
def send(self, msg): def send(self, msg):
@ -631,7 +644,9 @@ class Context(object):
raise SystemError('Cannot making blocking call on broker thread') raise SystemError('Cannot making blocking call on broker thread')
queue = Queue.Queue() queue = Queue.Queue()
msg.reply_to = self.router.add_handler(queue.put, persist=False) msg.reply_to = self.router.add_handler(queue.put,
persist=False,
respondent=self)
LOG.debug('%r.send_await(%r)', self, msg) LOG.debug('%r.send_await(%r)', self, msg)
self.send(msg) self.send(msg)
@ -741,6 +756,8 @@ class Router(object):
""" """
def __init__(self, broker): def __init__(self, broker):
self.broker = broker self.broker = broker
listen(broker, 'shutdown', self.on_broker_shutdown)
#: context ID -> Stream #: context ID -> Stream
self._stream_by_id = {} self._stream_by_id = {}
#: List of contexts to notify of shutdown. #: List of contexts to notify of shutdown.
@ -762,7 +779,7 @@ class Router(object):
del self._stream_by_id[context.context_id] del self._stream_by_id[context.context_id]
context.on_disconnect(broker) context.on_disconnect(broker)
def on_shutdown(self, broker): def on_broker_shutdown(self, broker):
for context in self._context_by_id.itervalues(): for context in self._context_by_id.itervalues():
context.on_shutdown(broker) context.on_shutdown(broker)
@ -785,19 +802,29 @@ class Router(object):
self._context_by_id[context.context_id] = context self._context_by_id[context.context_id] = context
self.broker.start_receive(stream) self.broker.start_receive(stream)
def add_handler(self, fn, handle=None, persist=True):
def add_handler(self, fn, handle=None, persist=True, respondent=None):
"""Invoke `fn(msg)` for each Message sent to `handle` from this """Invoke `fn(msg)` for each Message sent to `handle` from this
context. Unregister after one invocation if `persist` is ``False``. If context. Unregister after one invocation if `persist` is ``False``. If
`handle` is ``None``, a new handle is allocated and returned.""" `handle` is ``None``, a new handle is allocated and returned."""
handle = handle or self._last_handle.next() handle = handle or self._last_handle.next()
IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
self._handle_map[handle] = persist, fn self._handle_map[handle] = persist, fn
if respondent:
def on_disconnect():
if handle in self._handle_map:
fn(_DEAD)
del self._handle_map[handle]
listen(respondent, 'disconnect', on_disconnect)
return handle return handle
def on_shutdown(self, broker): def on_shutdown(self, broker):
"""Called during :py:meth:`Broker.shutdown`, informs callbacks """Called during :py:meth:`Broker.shutdown`, informs callbacks
registered with :py:meth:`add_handle_cb` the connection is dead.""" registered with :py:meth:`add_handle_cb` the connection is dead."""
LOG.debug('%r.on_shutdown(%r)', self, broker) LOG.debug('%r.on_shutdown(%r)', self, broker)
fire(self, 'shutdown')
for handle, (persist, fn) in self._handle_map.iteritems(): for handle, (persist, fn) in self._handle_map.iteritems():
LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(_DEAD) fn(_DEAD)
@ -947,8 +974,7 @@ class Broker(object):
while self._alive: while self._alive:
self._loop_once() self._loop_once()
for func in self.on_shutdown: fire(self, 'shutdown')
func(self)
for side in self._readers | self._writers: for side in self._readers | self._writers:
self._call(side.stream, side.stream.on_shutdown) self._call(side.stream, side.stream.on_shutdown)
@ -1020,7 +1046,6 @@ class ExternalContext(object):
def _setup_master(self, parent_id, context_id, key, in_fd, out_fd): def _setup_master(self, parent_id, context_id, key, in_fd, out_fd):
self.broker = Broker() self.broker = Broker()
self.router = Router(self.broker) self.router = Router(self.broker)
self.broker.on_shutdown.append(self.router.on_shutdown)
self.master = Context(self.router, 0, 'master') self.master = Context(self.router, 0, 'master')
if parent_id == 0: if parent_id == 0:
self.parent = self.master self.parent = self.master

@ -439,7 +439,11 @@ class Context(econtext.core.Context):
via = None via = None
def on_disconnect(self, broker): def on_disconnect(self, broker):
pass """
Override base behaviour of triggering Broker shutdown on parent stream
disconnection.
"""
econtext.core.fire(self, 'disconnect')
def _discard_result(self, msg): def _discard_result(self, msg):
data = msg.unpickle() data = msg.unpickle()

Loading…
Cancel
Save