|
|
|
@ -248,9 +248,9 @@ class BasicStream(object):
|
|
|
|
|
self.read_side.close()
|
|
|
|
|
self.write_side.close()
|
|
|
|
|
|
|
|
|
|
def shutdown(self):
|
|
|
|
|
def on_shutdown(self):
|
|
|
|
|
"""Disconnect gracefully. Base implementation calls on_disconnect()."""
|
|
|
|
|
LOG.debug('%r.shutdown()', self)
|
|
|
|
|
LOG.debug('%r.on_shutdown()', self)
|
|
|
|
|
self.on_disconnect()
|
|
|
|
|
|
|
|
|
|
def has_output(self):
|
|
|
|
@ -367,7 +367,7 @@ class Stream(BasicStream):
|
|
|
|
|
LOG.debug('%r.on_disconnect(): killing %r: %r', self, handle, fn)
|
|
|
|
|
fn(_DEAD)
|
|
|
|
|
|
|
|
|
|
def shutdown(self):
|
|
|
|
|
def on_shutdown(self):
|
|
|
|
|
"""Override BasicStream behaviour of immediately disconnecting."""
|
|
|
|
|
|
|
|
|
|
def accept(self, rfd, wfd):
|
|
|
|
@ -412,8 +412,8 @@ class Context(object):
|
|
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
self.add_handle_cb(self._shutdown, SHUTDOWN)
|
|
|
|
|
|
|
|
|
|
def shutdown(self):
|
|
|
|
|
"""Slave does nothing, _broker_main() will .shutdown its streams."""
|
|
|
|
|
def on_shutdown(self):
|
|
|
|
|
"""Slave does nothing, _broker_main() will shutdown its streams."""
|
|
|
|
|
|
|
|
|
|
def _shutdown(self, data):
|
|
|
|
|
if data != _DEAD and self.stream:
|
|
|
|
@ -545,8 +545,8 @@ class IoLogger(BasicStream):
|
|
|
|
|
line, _, self._buf = self._buf.partition('\n')
|
|
|
|
|
self._log.info('%s', line.rstrip('\n'))
|
|
|
|
|
|
|
|
|
|
def shutdown(self):
|
|
|
|
|
LOG.debug('%r.shutdown()', self)
|
|
|
|
|
def on_shutdown(self):
|
|
|
|
|
LOG.debug('%r.on_shutdown()', self)
|
|
|
|
|
self._wsock.shutdown(socket.SHUT_WR)
|
|
|
|
|
self._wsock.close()
|
|
|
|
|
|
|
|
|
@ -642,7 +642,7 @@ class Broker(object):
|
|
|
|
|
self._loop_once()
|
|
|
|
|
|
|
|
|
|
for side in self._readers | self._writers:
|
|
|
|
|
self._call_and_update(side.stream, side.stream.shutdown)
|
|
|
|
|
self._call_and_update(side.stream, side.stream.on_shutdown)
|
|
|
|
|
|
|
|
|
|
deadline = time.time() + self.graceful_timeout
|
|
|
|
|
while ((self._readers or self._writers) and
|
|
|
|
@ -661,16 +661,16 @@ class Broker(object):
|
|
|
|
|
except Exception:
|
|
|
|
|
LOG.exception('_broker_main() crashed')
|
|
|
|
|
|
|
|
|
|
def wait(self):
|
|
|
|
|
"""Wait for the broker to stop."""
|
|
|
|
|
self._thread.join()
|
|
|
|
|
|
|
|
|
|
def shutdown(self):
|
|
|
|
|
"""Gracefully disconnect streams and wait for broker to stop."""
|
|
|
|
|
"""Request broker gracefully disconnect streams and stop."""
|
|
|
|
|
LOG.debug('%r.shutdown()', self)
|
|
|
|
|
self._alive = False
|
|
|
|
|
self._waker.wake()
|
|
|
|
|
|
|
|
|
|
def wait(self):
|
|
|
|
|
"""Wait for the broker to stop."""
|
|
|
|
|
self._thread.join()
|
|
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
|
return 'Broker()'
|
|
|
|
|
|
|
|
|
|