diff --git a/docs/howitworks.rst b/docs/howitworks.rst index f23ac9b4..5b9a4dc0 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -209,28 +209,32 @@ and :py:meth:`call_async() `. Shutdown ######## -When the master signals the :py:data:`CALL_FUNCTION -` :py:class:`Channel ` is -closed, the child calls :py:meth:`shutdown() ` -followed by :py:meth:`wait() ` on its own broker, -triggering graceful shutdown. - -During shutdown, the master will wait a few seconds for children to disconnect -gracefully before force disconnecting them, while the children will use that -time to call :py:meth:`socket.shutdown(SHUT_WR) ` on -their :py:class:`IoLogger ` socket's write ends before -draining any remaining data buffered on the read ends. - -An alternative approach is to wait until the socket is completely closed, with -some hard timeout, but this necessitates greater discipline than is common in -infrastructure code (how often have you forgotten to redirect stderr to -``/dev/null``?), so needless irritating delays would often be experienced -during program termination. - -If the main thread (responsible for function call dispatch) fails to trigger -shutdown (because some user function is hanging), then the eventual force -disconnection by the master will cause the IO multiplexer thread to enter -shutdown by itself. +When a context receives :py:data:`SHUTDOWN ` from its +immediate parent, it closes its own :py:data:`CALL_FUNCTION +` :py:class:`Channel ` before +sending ``SHUTDOWN`` to any directly connected children. Closing the channel +has the effect of causing :py:meth:`ExternalContext._dispatch_calls() +` to exit and begin joining on +the broker thread. + +During shutdown, the master waits up to 5 seconds for children to disconnect +gracefully before force disconnecting them, while children will use that time +to call :py:meth:`socket.shutdown(SHUT_WR) ` on their +:py:class:`IoLogger ` socket's write ends before +draining any remaining data buffered on the read ends, and ensuring any +deferred broker function callbacks have had a chance to complete, necessary to +capture for example forwarding any remaining :py:mod:`logging` records. + +An alternative approach is to wait until the IoLogger socket is completely +closed, with some hard timeout, but this necessitates greater discipline than +is common in infrastructure code (how often have you forgotten to redirect +stderr to ``/dev/null`` when starting a daemon process?), so needless +irritating delays would often be experienced during program termination. + +If the main thread (responsible for function call dispatch) fails to shut down +gracefully, because some user function is hanging, it will still be cleaned up +since as the final step in broker shutdown, the broker sends +:py:data:`signal.SIGTERM` to its own process. .. _stream-protocol: @@ -291,10 +295,22 @@ Children listen on the following handles: When this channel is closed (by way of sending ``_DEAD`` to it), the child's main thread begins graceful shutdown of its own `Broker` and - `Router`. Each child is responsible for sending ``_DEAD`` to each of its - directly connected children in response to the master sending ``_DEAD`` to - it, and arranging for the connection to its parent context to be closed - shortly thereafter. + `Router`. + +.. data:: mitogen.core.SHUTDOWN + + When received from a child's immediate parent, causes the broker thread to + enter graceful shutdown, including writing ``_DEAD`` to the child's main + thread, causing it to join on the exit of the broker thread. + + The final step of a child's broker shutdown process sends + :py:data:`signal.SIGTERM` to itself, ensuring the process dies even if the + main thread was hung executing user code. + + Each context is responsible for sending ``SHUTDOWN`` to each of its + directly connected children in response to the master sending ``SHUTDOWN`` + to it, and arranging for the connection to its parent to be closed shortly + thereafter. .. data:: mitogen.core.ADD_ROUTE diff --git a/mitogen/core.py b/mitogen/core.py index 1d39f894..c0492eea 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -10,6 +10,7 @@ import itertools import logging import os import select +import signal import socket import struct import sys @@ -28,6 +29,7 @@ CALL_FUNCTION = 101 FORWARD_LOG = 102 ADD_ROUTE = 103 ALLOCATE_ID = 104 +SHUTDOWN = 105 CHUNK_SIZE = 16384 @@ -1071,7 +1073,8 @@ class Broker(object): attribute is ``True``, or any :py:class:`Context` is still registered that is not the master. Used to delay shutdown while some important work is in progress (e.g. log draining).""" - return sum((side.keep_alive for side in self._readers), 0) + return (sum((side.keep_alive for side in self._readers), 0) + + (not self._queue.empty())) def _broker_main(self): """Handle events until :py:meth:`shutdown`. On shutdown, invoke @@ -1104,6 +1107,8 @@ class Broker(object): except Exception: LOG.exception('_broker_main() crashed') + fire(self, 'exit') + def shutdown(self): """Request broker gracefully disconnect streams and stop.""" LOG.debug('%r.shutdown()', self) @@ -1123,11 +1128,22 @@ class ExternalContext(object): def _on_broker_shutdown(self): self.channel.close() + def _on_broker_exit(self): + os.kill(os.getpid(), signal.SIGTERM) + + def _on_shutdown_msg(self, msg): + LOG.debug('_on_shutdown_msg(%r)', msg) + if msg.src_id != mitogen.parent_id: + LOG.warning('Ignoring SHUTDOWN from non-parent: %r', msg) + return + self.broker.shutdown() + def _setup_master(self, profiling, parent_id, context_id, in_fd, out_fd): if profiling: enable_profiling() self.broker = Broker() self.router = Router(self.broker) + self.router.add_handler(self._on_shutdown_msg, SHUTDOWN) self.master = Context(self.router, 0, 'master') if parent_id == 0: self.parent = self.master @@ -1141,6 +1157,7 @@ class ExternalContext(object): self.stream.receive_side.keep_alive = False listen(self.broker, 'shutdown', self._on_broker_shutdown) + listen(self.broker, 'exit', self._on_broker_exit) os.close(in_fd) try: @@ -1224,6 +1241,7 @@ class ExternalContext(object): self.router.route( Message.pickled(e, dst_id=msg.src_id, handle=msg.reply_to) ) + self.dispatch_stopped = True def main(self, parent_ids, context_id, debug, profiling, log_level, in_fd=100, out_fd=1, core_src_fd=101, setup_stdio=True): diff --git a/mitogen/master.py b/mitogen/master.py index 7ead0557..c1af3eb9 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -710,11 +710,10 @@ class Stream(mitogen.core.Stream): """Request the slave gracefully shut itself down.""" LOG.debug('%r closing CALL_FUNCTION channel', self) self.send( - mitogen.core.Message.pickled( - mitogen.core._DEAD, + mitogen.core.Message( src_id=mitogen.context_id, dst_id=self.remote_id, - handle=mitogen.core.CALL_FUNCTION + handle=mitogen.core.SHUTDOWN, ) ) diff --git a/tests/call_function_test.py b/tests/call_function_test.py index 131e4079..c8c390f3 100644 --- a/tests/call_function_test.py +++ b/tests/call_function_test.py @@ -74,7 +74,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): def test_aborted_on_local_broker_shutdown(self): stream = self.router._stream_by_id[self.local.context_id] recv = self.local.call_async(time.sleep, 120) - time.sleep(0.1) # Ensure GIL is released + time.sleep(0.05) # Ensure GIL is released self.broker.shutdown() exc = self.assertRaises(mitogen.core.ChannelError, lambda: recv.get())