diff --git a/econtext/core.py b/econtext/core.py index 82a2a34b..3506b383 100644 --- a/econtext/core.py +++ b/econtext/core.py @@ -357,7 +357,7 @@ class Side(object): streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional (e.g. UNIX socket) file descriptors to operate identically. """ - def __init__(self, stream, fd, keep_alive=False): + def __init__(self, stream, fd, keep_alive=True): #: The :py:class:`Stream` for which this is a read or write side. self.stream = stream #: Integer file descriptor to perform IO on. @@ -708,7 +708,7 @@ class IoLogger(BasicStream): set_cloexec(self._rsock.fileno()) set_cloexec(self._wsock.fileno()) - self.receive_side = Side(self, self._rsock.fileno(), keep_alive=True) + self.receive_side = Side(self, self._rsock.fileno()) self.transmit_side = Side(self, dest_fd) self._broker.start_receive(self) @@ -758,13 +758,14 @@ class Router(object): def on_disconnect(self, stream, broker): """Invoked by Stream.on_disconnect().""" for context in self._context_by_id.itervalues(): - if self._stream_by_id[context.context_id] is stream: + stream_ = self._stream_by_id.get(context.context_id) + if stream_ is stream: del self._stream_by_id[context.context_id] context.on_disconnect(broker) - def on_shutdown(self): + def on_shutdown(self, broker): for context in self._context_by_id.itervalues(): - context.on_shutdown() + context.on_shutdown(broker) def add_route(self, target_id, via_id): LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id) @@ -816,8 +817,8 @@ class Broker(object): #: gracefully before force-disconnecting them during :py:meth:`shutdown`. shutdown_timeout = 3.0 - def __init__(self, on_shutdown=[]): - self._on_shutdown = on_shutdown + def __init__(self): + self.on_shutdown = [] self._alive = True self._queue = Queue.Queue() self._readers = set() @@ -906,6 +907,9 @@ class Broker(object): while self._alive: self._loop_once() + for func in self.on_shutdown: + func(self) + for side in self._readers | self._writers: self._call(side.stream, side.stream.on_shutdown) @@ -976,6 +980,7 @@ class ExternalContext(object): def _setup_master(self, parent_id, context_id, key, in_fd, out_fd): self.broker = Broker() self.router = Router(self.broker) + self.broker.on_shutdown.append(self.router.on_shutdown) self.master = Context(self.router, 0, 'master') if parent_id == 0: self.parent = self.master @@ -986,6 +991,7 @@ class ExternalContext(object): self.stream = Stream(self.router, parent_id, key) self.stream.name = 'parent' self.stream.accept(in_fd, out_fd) + self.stream.receive_side.keep_alive = False os.close(in_fd) try: