diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 0e48a538..5c8bb018 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -445,6 +445,20 @@ also listen on the following handles: In this way, the master need never re-send a module it has already sent to a direct descendant. +.. currentmodule:: mitogen.core +.. data:: DETACHING + + Sent to inform a parent that user code has invoked + :meth:`ExternalContext.detach` to decouple the lifecycle of a directly + connected context and its subtree from the running program. + + A child usually shuts down immediately if it loses its parent connection, + and parents usually terminate any related Python/SSH subprocess on + disconnection. Receiving :data:`DETACHING` informs the parent the + connection will soon drop, but the process intends to continue life + independently, and to avoid terminating the related subprocess if that + subprocess is the child itself. + Additional handles are created to receive the result of every function call triggered by :py:meth:`call_async() `. diff --git a/docs/images/disconnected-subtree.graphml b/docs/images/detached-subtree.graphml similarity index 100% rename from docs/images/disconnected-subtree.graphml rename to docs/images/detached-subtree.graphml diff --git a/docs/images/disconnected-subtree.png b/docs/images/detached-subtree.png similarity index 100% rename from docs/images/disconnected-subtree.png rename to docs/images/detached-subtree.png diff --git a/docs/index.rst b/docs/index.rst index a2c9c9c1..a693fc15 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -237,6 +237,17 @@ uptime')** without further need to capture or manage output. 18:17:56 I mitogen.ctx.k3: stdout: 17:37:10 up 562 days, 2:25, 5 users, load average: 1.24, 1.13, 1.14 +Detached Subtrees +################# + +.. image:: images/detached-subtree.png + +It is possible to dynamically construct and decouple individual contexts from +the lifecycle of the running program without terminating them, while enabling +communication with any descendents in the subtree to be maintained. This is +intended to support implementing background tasks. + + Blocking Code Friendly ###################### diff --git a/mitogen/core.py b/mitogen/core.py index 56fc3266..f47b8380 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -75,6 +75,7 @@ DEL_ROUTE = 104 ALLOCATE_ID = 105 SHUTDOWN = 106 LOAD_MODULE = 107 +DETACHING = 108 IS_DEAD = 999 PY3 = sys.version_info > (3,) @@ -953,6 +954,7 @@ class Context(object): raise SystemError('Cannot making blocking call on broker thread') receiver = Receiver(self.router, persist=persist, respondent=self) + msg.dst_id = self.context_id msg.reply_to = receiver.handle _v and LOG.debug('%r.send_async(%r)', self, msg) @@ -1277,6 +1279,10 @@ class Router(object): self.broker.start_receive(stream) listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream)) + def stream_by_id(self, dst_id): + return self._stream_by_id.get(dst_id, + self._stream_by_id.get(mitogen.parent_id)) + def del_handler(self, handle): del self._handle_map[handle] @@ -1501,6 +1507,8 @@ class Broker(object): class ExternalContext(object): + detached = False + def _on_broker_shutdown(self): self.channel.close() @@ -1514,8 +1522,34 @@ class ExternalContext(object): self.broker.shutdown() def _on_parent_disconnect(self): - _v and LOG.debug('%r: parent stream is gone, dying.', self) - self.broker.shutdown() + if self.detached: + mitogen.parent_ids = [] + mitogen.parent_id = None + LOG.info('Detachment complete') + else: + _v and LOG.debug('%r: parent stream is gone, dying.', self) + self.broker.shutdown() + + def _sync(self, func): + latch = Latch() + self.broker.defer(lambda: latch.put(func())) + return latch.get() + + def detach(self): + self.detached = True + stream = self.router.stream_by_id(mitogen.parent_id) + if stream: # not double-detach()'d + os.setsid() + self.parent.send_await(Message(handle=DETACHING)) + LOG.info('Detaching from %r; parent is %s', stream, self.parent) + for x in range(20): + pending = self._sync(lambda: stream.pending_bytes()) + if not pending: + break + time.sleep(0.05) + if pending: + LOG.error('Stream had %d bytes after 2000ms', pending) + self.broker.defer(stream.on_disconnect, self.broker) def _setup_master(self, max_message_size, profiling, parent_id, context_id, in_fd, out_fd): diff --git a/mitogen/docker.py b/mitogen/docker.py index c265dc4f..148c132b 100644 --- a/mitogen/docker.py +++ b/mitogen/docker.py @@ -36,6 +36,8 @@ LOG = logging.getLogger(__name__) class Stream(mitogen.parent.Stream): + child_is_immediate_subprocess = False + container = None image = None username = None diff --git a/mitogen/fork.py b/mitogen/fork.py index fdf40c39..70737fc8 100644 --- a/mitogen/fork.py +++ b/mitogen/fork.py @@ -81,6 +81,8 @@ def handle_child_crash(): class Stream(mitogen.parent.Stream): + child_is_immediate_subprocess = True + #: Reference to the importer, if any, recovered from the parent. importer = None diff --git a/mitogen/jail.py b/mitogen/jail.py index 37c89483..ed04da00 100644 --- a/mitogen/jail.py +++ b/mitogen/jail.py @@ -36,6 +36,7 @@ LOG = logging.getLogger(__name__) class Stream(mitogen.parent.Stream): + child_is_immediate_subprocess = False create_child_args = { 'merge_stdio': True } diff --git a/mitogen/lxc.py b/mitogen/lxc.py index 8a8e4b78..eb8ad173 100644 --- a/mitogen/lxc.py +++ b/mitogen/lxc.py @@ -36,6 +36,7 @@ LOG = logging.getLogger(__name__) class Stream(mitogen.parent.Stream): + child_is_immediate_subprocess = False create_child_args = { # If lxc-attach finds any of stdin, stdout, stderr connected to a TTY, # to prevent input injection it creates a proxy pty, forcing all IO to diff --git a/mitogen/master.py b/mitogen/master.py index d0bc55a8..95202e35 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -690,6 +690,11 @@ class Router(mitogen.parent.Router): self.responder = ModuleResponder(self) self.log_forwarder = LogForwarder(self) self.route_monitor = mitogen.parent.RouteMonitor(router=self) + self.add_handler( # TODO: cutpaste. + fn=self._on_detaching, + handle=mitogen.core.DETACHING, + persist=True, + ) def enable_debug(self): mitogen.core.enable_debug_logging() diff --git a/mitogen/parent.py b/mitogen/parent.py index 7352712e..feac28a8 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -599,12 +599,23 @@ class Stream(mitogen.core.Stream): ) ) + #: If :data:`True`, indicates the subprocess managed by us should not be + #: killed during graceful detachment, as it the actual process implementing + #: the child context. In all other cases, the subprocess is SSH, sudo, or a + #: similar tool that should be reminded to quit during disconnection. + child_is_immediate_subprocess = True + + detached = False _reaped = False def _reap_child(self): """ Reap the child process during disconnection. """ + if self.detached and self.child_is_immediate_subprocess: + LOG.debug('%r: immediate child is detached, won\'t reap it', self) + return + if self._reaped: # on_disconnect() may be invoked more than once, for example, if # there is still a pending message to be sent after the first @@ -929,10 +940,22 @@ class Router(mitogen.core.Router): importer=importer, ) self.route_monitor = RouteMonitor(self, parent) + self.add_handler( + fn=self._on_detaching, + handle=mitogen.core.DETACHING, + persist=True, + ) - def stream_by_id(self, dst_id): - return self._stream_by_id.get(dst_id, - self._stream_by_id.get(mitogen.parent_id)) + def _on_detaching(self, msg): + if msg.is_dead: + return + stream = self.stream_by_id(msg.src_id) + if stream.remote_id != msg.src_id or stream.detached: + LOG.warning('bad DETACHING received on %r: %r', stream, msg) + return + LOG.debug('%r: marking as detached', stream) + stream.detached = True + msg.reply(None) def add_route(self, target_id, stream): LOG.debug('%r.add_route(%r, %r)', self, target_id, stream) diff --git a/mitogen/setns.py b/mitogen/setns.py index 979e6bf4..c95b3217 100644 --- a/mitogen/setns.py +++ b/mitogen/setns.py @@ -105,6 +105,8 @@ def get_machinectl_pid(path, name): class Stream(mitogen.parent.Stream): + child_is_immediate_subprocess = False + container = None username = None kind = None diff --git a/mitogen/ssh.py b/mitogen/ssh.py index 0fe18f75..76fad2a0 100644 --- a/mitogen/ssh.py +++ b/mitogen/ssh.py @@ -59,6 +59,7 @@ class HostKeyError(mitogen.core.StreamError): class Stream(mitogen.parent.Stream): create_child = staticmethod(mitogen.parent.hybrid_tty_create_child) + child_is_immediate_subprocess = False python_path = 'python2.7' #: Once connected, points to the corresponding TtyLogStream, allowing it to diff --git a/mitogen/su.py b/mitogen/su.py index bfaada11..2cc3406b 100644 --- a/mitogen/su.py +++ b/mitogen/su.py @@ -46,6 +46,7 @@ class Stream(mitogen.parent.Stream): # for hybrid_tty_create_child(), there just needs to be either a shell # snippet or bootstrap support for fixing things up afterwards. create_child = staticmethod(mitogen.parent.tty_create_child) + child_is_immediate_subprocess = False #: Once connected, points to the corresponding TtyLogStream, allowing it to #: be disconnected at the same time this stream is being torn down. diff --git a/mitogen/sudo.py b/mitogen/sudo.py index 9377c960..5d2911fc 100644 --- a/mitogen/sudo.py +++ b/mitogen/sudo.py @@ -104,6 +104,7 @@ class PasswordError(mitogen.core.StreamError): class Stream(mitogen.parent.Stream): create_child = staticmethod(mitogen.parent.hybrid_tty_create_child) + child_is_immediate_subprocess = False #: Once connected, points to the corresponding TtyLogStream, allowing it to #: be disconnected at the same time this stream is being torn down.