diff --git a/econtext/core.py b/econtext/core.py index 195dbe18..70033573 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -224,6 +224,9 @@ class Receiver(object): IOLOG.debug('%r._on_receive(%r)', self, msg) self._queue.put(msg) + def close(self): + self._queue.put(_DEAD) + def get(self, timeout=None): """Receive an object, or ``None`` if `timeout` is reached.""" IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout) @@ -1029,6 +1032,9 @@ class ExternalContext(object): The :py:class:`IoLogger` connected to ``stderr``. """ + def _on_broker_shutdown(self): + self.channel.close() + def _setup_master(self, parent_id, context_id, key, in_fd, out_fd): self.broker = Broker() self.router = Router(self.broker) @@ -1044,6 +1050,8 @@ class ExternalContext(object): self.stream.accept(in_fd, out_fd) self.stream.receive_side.keep_alive = False + listen(self.broker, 'shutdown', self._on_broker_shutdown) + os.close(in_fd) try: os.wait() # Reap first stage.