Kindof working graceful shutdown again.

pull/35/head
David Wilson 7 years ago
parent 9d356736ca
commit 88eea78282

@ -357,7 +357,7 @@ class Side(object):
streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional streams implemented using unidirectional (e.g. UNIX pipe) and bidirectional
(e.g. UNIX socket) file descriptors to operate identically. (e.g. UNIX socket) file descriptors to operate identically.
""" """
def __init__(self, stream, fd, keep_alive=False): def __init__(self, stream, fd, keep_alive=True):
#: The :py:class:`Stream` for which this is a read or write side. #: The :py:class:`Stream` for which this is a read or write side.
self.stream = stream self.stream = stream
#: Integer file descriptor to perform IO on. #: Integer file descriptor to perform IO on.
@ -708,7 +708,7 @@ class IoLogger(BasicStream):
set_cloexec(self._rsock.fileno()) set_cloexec(self._rsock.fileno())
set_cloexec(self._wsock.fileno()) set_cloexec(self._wsock.fileno())
self.receive_side = Side(self, self._rsock.fileno(), keep_alive=True) self.receive_side = Side(self, self._rsock.fileno())
self.transmit_side = Side(self, dest_fd) self.transmit_side = Side(self, dest_fd)
self._broker.start_receive(self) self._broker.start_receive(self)
@ -758,13 +758,14 @@ class Router(object):
def on_disconnect(self, stream, broker): def on_disconnect(self, stream, broker):
"""Invoked by Stream.on_disconnect().""" """Invoked by Stream.on_disconnect()."""
for context in self._context_by_id.itervalues(): for context in self._context_by_id.itervalues():
if self._stream_by_id[context.context_id] is stream: stream_ = self._stream_by_id.get(context.context_id)
if stream_ is stream:
del self._stream_by_id[context.context_id] del self._stream_by_id[context.context_id]
context.on_disconnect(broker) context.on_disconnect(broker)
def on_shutdown(self): def on_shutdown(self, broker):
for context in self._context_by_id.itervalues(): for context in self._context_by_id.itervalues():
context.on_shutdown() context.on_shutdown(broker)
def add_route(self, target_id, via_id): def add_route(self, target_id, via_id):
LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id) LOG.debug('%r.add_route(%r, %r)', self, target_id, via_id)
@ -816,8 +817,8 @@ class Broker(object):
#: gracefully before force-disconnecting them during :py:meth:`shutdown`. #: gracefully before force-disconnecting them during :py:meth:`shutdown`.
shutdown_timeout = 3.0 shutdown_timeout = 3.0
def __init__(self, on_shutdown=[]): def __init__(self):
self._on_shutdown = on_shutdown self.on_shutdown = []
self._alive = True self._alive = True
self._queue = Queue.Queue() self._queue = Queue.Queue()
self._readers = set() self._readers = set()
@ -906,6 +907,9 @@ class Broker(object):
while self._alive: while self._alive:
self._loop_once() self._loop_once()
for func in self.on_shutdown:
func(self)
for side in self._readers | self._writers: for side in self._readers | self._writers:
self._call(side.stream, side.stream.on_shutdown) self._call(side.stream, side.stream.on_shutdown)
@ -976,6 +980,7 @@ class ExternalContext(object):
def _setup_master(self, parent_id, context_id, key, in_fd, out_fd): def _setup_master(self, parent_id, context_id, key, in_fd, out_fd):
self.broker = Broker() self.broker = Broker()
self.router = Router(self.broker) self.router = Router(self.broker)
self.broker.on_shutdown.append(self.router.on_shutdown)
self.master = Context(self.router, 0, 'master') self.master = Context(self.router, 0, 'master')
if parent_id == 0: if parent_id == 0:
self.parent = self.master self.parent = self.master
@ -986,6 +991,7 @@ class ExternalContext(object):
self.stream = Stream(self.router, parent_id, key) self.stream = Stream(self.router, parent_id, key)
self.stream.name = 'parent' self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd) self.stream.accept(in_fd, out_fd)
self.stream.receive_side.keep_alive = False
os.close(in_fd) os.close(in_fd)
try: try:

Loading…
Cancel
Save