From 299d4a2e05f5fa86c32b8336a0b791a682052eaf Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sat, 9 Sep 2017 00:37:56 +0530 Subject: [PATCH] Teach Router.add_handler() to kill a handler on context disconnect When a context's Stream is disconnected, now any reply_to handlers waiting for that specific context will be cancelled, rather than hanging until all pending handelrs are cancelled during Broker is torn down. This is groundwork for a bunch of things, including moving connect() to the Broker thread --- econtext/core.py | 37 +++++++++++++++++++++++++++++++------ econtext/master.py | 6 +++++- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/econtext/core.py b/econtext/core.py index 0770c568..75d5340b 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -93,6 +93,16 @@ class Dead(object): _DEAD = Dead() +def listen(obj, name, func): + signals = vars(obj).setdefault('_signals', {}) + signals.setdefault(name, []).append(func) + + +def fire(obj, name, **kwargs): + signals = vars(obj).get('_signals', {}) + return [func(**kwargs) for func in signals.get(name, ())] + + def set_cloexec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) @@ -479,9 +489,11 @@ class BasicStream(object): broker.stop_transmit(self) self.receive_side.close() self.transmit_side.close() + fire(self, 'disconnect') def on_shutdown(self, broker): LOG.debug('%r.on_shutdown()', self) + fire(self, 'shutdown') self.on_disconnect(broker) @@ -615,6 +627,7 @@ class Context(object): def on_disconnect(self, broker): LOG.debug('Parent stream is gone, dying.') + fire(self, 'disconnect') broker.shutdown() def send(self, msg): @@ -631,7 +644,9 @@ class Context(object): raise SystemError('Cannot making blocking call on broker thread') queue = Queue.Queue() - msg.reply_to = self.router.add_handler(queue.put, persist=False) + msg.reply_to = self.router.add_handler(queue.put, + persist=False, + respondent=self) LOG.debug('%r.send_await(%r)', self, msg) self.send(msg) @@ -741,6 +756,8 @@ class Router(object): """ def __init__(self, broker): self.broker = broker + listen(broker, 'shutdown', self.on_broker_shutdown) + #: context ID -> Stream self._stream_by_id = {} #: List of contexts to notify of shutdown. @@ -762,7 +779,7 @@ class Router(object): del self._stream_by_id[context.context_id] context.on_disconnect(broker) - def on_shutdown(self, broker): + def on_broker_shutdown(self, broker): for context in self._context_by_id.itervalues(): context.on_shutdown(broker) @@ -785,19 +802,29 @@ class Router(object): self._context_by_id[context.context_id] = context self.broker.start_receive(stream) - def add_handler(self, fn, handle=None, persist=True): + + def add_handler(self, fn, handle=None, persist=True, respondent=None): """Invoke `fn(msg)` for each Message sent to `handle` from this context. Unregister after one invocation if `persist` is ``False``. If `handle` is ``None``, a new handle is allocated and returned.""" handle = handle or self._last_handle.next() IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) self._handle_map[handle] = persist, fn + + if respondent: + def on_disconnect(): + if handle in self._handle_map: + fn(_DEAD) + del self._handle_map[handle] + listen(respondent, 'disconnect', on_disconnect) + return handle def on_shutdown(self, broker): """Called during :py:meth:`Broker.shutdown`, informs callbacks registered with :py:meth:`add_handle_cb` the connection is dead.""" LOG.debug('%r.on_shutdown(%r)', self, broker) + fire(self, 'shutdown') for handle, (persist, fn) in self._handle_map.iteritems(): LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn) fn(_DEAD) @@ -947,8 +974,7 @@ class Broker(object): while self._alive: self._loop_once() - for func in self.on_shutdown: - func(self) + fire(self, 'shutdown') for side in self._readers | self._writers: self._call(side.stream, side.stream.on_shutdown) @@ -1020,7 +1046,6 @@ class ExternalContext(object): def _setup_master(self, parent_id, context_id, key, in_fd, out_fd): self.broker = Broker() self.router = Router(self.broker) - self.broker.on_shutdown.append(self.router.on_shutdown) self.master = Context(self.router, 0, 'master') if parent_id == 0: self.parent = self.master diff --git a/econtext/master.py b/econtext/master.py index 53d13681..82184f16 100644 --- a/econtext/master.py +++ b/econtext/master.py @@ -439,7 +439,11 @@ class Context(econtext.core.Context): via = None def on_disconnect(self, broker): - pass + """ + Override base behaviour of triggering Broker shutdown on parent stream + disconnection. + """ + econtext.core.fire(self, 'disconnect') def _discard_result(self, msg): data = msg.unpickle()