diff --git a/docs/internals.rst b/docs/internals.rst index e4f7bd91..71c6273d 100644 --- a/docs/internals.rst +++ b/docs/internals.rst @@ -218,6 +218,10 @@ Child Implementation Process Management ================== +.. currentmodule:: mitogen.parent +.. autoclass:: Reaper + :members: + .. currentmodule:: mitogen.parent .. autoclass:: Process :members: diff --git a/mitogen/parent.py b/mitogen/parent.py index 59ba15d9..5a525c15 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1364,7 +1364,22 @@ class Connection(object): #: Prefix given to default names generated by :meth:`connect`. name_prefix = u'local' - timer = None + #: :class:`Timer` that runs :meth:`_on_timer_expired` when connection + #: timeout occurs. + _timer = None + + #: When disconnection completes, instance of :class:`Reaper` used to wait + #: on the exit status of the subprocess. + _reaper = None + + #: On failure, the exception object that should be propagated back to the + #: user. + exception = None + + #: Extra text appended to :class:`EofError` if that exception is raised on + #: a failed connection attempt. May be used in subclasses to hint at common + #: problems with a particular connection method. + eof_error_hint = None def __init__(self, options, router): #: :class:`Options` @@ -1499,8 +1514,6 @@ class Connection(object): msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args)) raise mitogen.core.StreamError(msg) - eof_error_hint = None - def _adorn_eof_error(self, e): """ Subclasses may provide additional information in the case of a failed @@ -1509,10 +1522,8 @@ class Connection(object): if self.eof_error_hint: e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),) - exception = None - def _complete_connection(self): - self.timer.cancel() + self._timer.cancel() if not self.exception: mitogen.core.unlisten(self._router.broker, 'shutdown', self._on_broker_shutdown) @@ -1569,19 +1580,36 @@ class Connection(object): def _on_streams_disconnected(self): """ - When disconnection has been detected for both our streams, cancel the + When disconnection has been detected for both streams, cancel the connection timer, mark the connection failed, and reap the child process. Do nothing if the timer has already been cancelled, indicating some existing failure has already been noticed. """ - if not self.timer.cancelled: - self.timer.cancel() + if self._timer.active: + self._timer.cancel() self._fail_connection(EofError( self.eof_error_msg + get_history( [self.stdio_stream, self.stderr_stream] ) )) - self.proc._async_reap(self, self._router) + + if self._reaper: + return + + self._reaper = Reaper( + broker=self._router.broker, + proc=self.proc, + kill=not ( + (self.detached and self.child_is_immediate_subprocess) or + # Avoid killing so child has chance to write cProfile data + self._router.profiling + ), + # Don't delay shutdown waiting for a detached child, since the + # detached child may expect to live indefinitely after its parent + # exited. + wait_on_shutdown=(not self.detached), + ) + self._reaper.reap() def _on_broker_shutdown(self): """ @@ -1590,20 +1618,6 @@ class Connection(object): """ self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG)) - def _start_timer(self): - self.timer = self._router.broker.timers.schedule( - when=self.options.connect_deadline, - func=self._on_timer_expired, - ) - - def _on_timer_expired(self): - self._fail_connection( - mitogen.core.TimeoutError( - 'Failed to setup connection after %.2f seconds', - self.options.connect_timeout, - ) - ) - def stream_factory(self): return self.stream_protocol_class.build_stream( broker=self._router.broker, @@ -1632,12 +1646,23 @@ class Connection(object): self._router.broker.start_receive(stream) return stream + def _on_timer_expired(self): + self._fail_connection( + mitogen.core.TimeoutError( + 'Failed to setup connection after %.2f seconds', + self.options.connect_timeout, + ) + ) + def _async_connect(self): LOG.debug('creating connection to context %d using %s', self.context.context_id, self.__class__.__module__) mitogen.core.listen(self._router.broker, 'shutdown', self._on_broker_shutdown) - self._start_timer() + self._timer = self._router.broker.timers.schedule( + when=self.options.connect_deadline, + func=self._on_timer_expired, + ) try: self.proc = self.start_child() @@ -2464,12 +2489,121 @@ class Router(mitogen.core.Router): return self.connect(u'ssh', **kwargs) -class Process(object): +class Reaper(object): + """ + Asynchronous logic for reaping :class:`Process` objects. This is necessary + to prevent uncontrolled buildup of zombie processes in long-lived parents + that will eventually reach an OS limit, preventing creation of new threads + and processes, and to log the exit status of the child in the case of an + error. + + To avoid modifying process-global state such as with + :func:`signal.set_wakeup_fd` or installing a :data:`signal.SIGCHLD` handler + that might interfere with the user's ability to use those facilities, + Reaper polls for exit with backoff using timers installed on an associated + :class:`Broker`. + + :param mitogen.core.Broker broker: + The :class:`Broker` on which to install timers + :param Process proc: + The process to reap. + :param bool kill: + If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process. + :param bool wait_on_shutdown: + If :data:`True`, delay :class:`Broker` shutdown if child has not yet + exited. If :data:`False` simply forget the child. """ - Process objects contain asynchronous logic for reaping children, and - keeping track of their stdio descriptors. + #: :class:`Timer` that invokes :meth:`reap` after some polling delay. + _timer = None + + def __init__(self, broker, proc, kill, wait_on_shutdown): + self.broker = broker + self.proc = proc + self.kill = kill + self.wait_on_shutdown = wait_on_shutdown + self._tries = 0 + + def _signal_child(self, signum): + # For processes like sudo we cannot actually send sudo a signal, + # because it is setuid, so this is best-effort only. + LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum]) + try: + os.kill(self.proc.pid, signum) + except OSError: + e = sys.exc_info()[1] + if e.args[0] != errno.EPERM: + raise - This base class is extended by :class:`PopenProcess` and + def _calc_delay(self, count): + """ + Calculate a poll delay given `count` attempts have already been made. + These constants have no principle, they just produce rapid but still + relatively conservative retries. + """ + delay = 0.05 + for _ in xrange(count): + delay *= 1.72 + return delay + + def _on_broker_shutdown(self): + """ + Respond to :class:`Broker` shutdown by cancelling the reap timer if + :attr:`Router.await_children_at_shutdown` is disabled. Otherwise + shutdown is delayed for up to :attr:`Broker.shutdown_timeout` for + subprocesses may have no intention of exiting any time soon. + """ + if not self.wait_on_shutdown: + self._timer.cancel() + + def _install_timer(self, delay): + new = self._timer is None + self._timer = self.broker.timers.schedule( + when=time.time() + delay, + func=self.reap, + ) + if new: + mitogen.core.listen(self.broker, 'shutdown', + self._on_broker_shutdown) + + def _remove_timer(self): + if self._timer and self._timer.active: + self._timer.cancel() + mitogen.core.unlisten(self.broker, 'shutdown', + self._on_broker_shutdown) + + def reap(self): + """ + Reap the child process during disconnection. + """ + status = self.proc.poll() + if status is not None: + LOG.debug('%r: %s', self.proc, returncode_to_str(status)) + self._remove_timer() + return + + self._tries += 1 + if self._tries > 20: + LOG.warning('%r: child will not exit, giving up', self) + self._remove_timer() + return + + delay = self._calc_delay(self._tries - 1) + LOG.debug('%r still running after IO disconnect, recheck in %.03fs', + self.proc, delay) + self._install_timer(delay) + + if not self.kill: + pass + elif self._tries == 1: + self._signal_child(signal.SIGTERM) + elif self._tries == 5: # roughly 4 seconds + self._signal_child(signal.SIGKILL) + + +class Process(object): + """ + Process objects provide a uniform interface to the :mod:`subprocess` and + :mod:`mitogen.fork`. This class is extended by :class:`PopenProcess` and :class:`mitogen.fork.Process`. :param int pid: @@ -2481,16 +2615,19 @@ class Process(object): :param file stderr: File object attached to standard error, or :data:`None`. """ - _delays = [0.05, 0.15, 0.3, 1.0, 5.0, 10.0] + #: Name of the process used in logs. Set to the stream/context name by + #: :class:`Connection`. name = None def __init__(self, pid, stdin, stdout, stderr=None): + #: The process ID. self.pid = pid + #: File object attached to standard input. self.stdin = stdin + #: File object attached to standard output. self.stdout = stdout + #: File object attached to standard error. self.stderr = stderr - self._returncode = None - self._reap_count = 0 def __repr__(self): return '%s %s pid %d' % ( @@ -2510,56 +2647,6 @@ class Process(object): """ raise NotImplementedError() - def _signal_child(self, signum): - # For processes like sudo we cannot actually send sudo a signal, - # because it is setuid, so this is best-effort only. - LOG.debug('%r: child process still alive, sending %s', - self, SIGNAL_BY_NUM[signum]) - try: - os.kill(self.pid, signum) - except OSError: - e = sys.exc_info()[1] - if e.args[0] != errno.EPERM: - raise - - def _async_reap(self, conn, router): - """ - Reap the child process during disconnection. - """ - if self._returncode is not None: - # on_disconnect() may be invoked more than once, for example, if - # there is still a pending message to be sent after the first - # on_disconnect() call. - return - - if conn.detached and conn.child_is_immediate_subprocess: - LOG.debug('%r: immediate child is detached, won\'t reap it', self) - return - - if router.profiling: - LOG.info('%r: wont kill child because profiling=True', self) - return - - self._reap_count += 1 - status = self.poll() - if status is not None: - LOG.debug('%r: %s', self, returncode_to_str(status)) - return - - i = self._reap_count - 1 - if i >= len(self._delays): - LOG.warning('%r: child will not die, abandoning it', self) - return - elif i == 0: - self._signal_child(signal.SIGTERM) - elif i == 1: - self._signal_child(signal.SIGKILL) - - router.broker.timers.schedule( - when=time.time() + self._delays[i], - func=lambda: self._async_reap(conn, router), - ) - class PopenProcess(Process): """ diff --git a/tests/connection_test.py b/tests/connection_test.py index a66428e4..619594d9 100644 --- a/tests/connection_test.py +++ b/tests/connection_test.py @@ -1,9 +1,10 @@ -import time -import tempfile -import sys import os +import signal +import sys +import tempfile import threading +import time import unittest2 import testlib @@ -44,5 +45,33 @@ class ConnectionTest(testlib.RouterMixin, testlib.TestCase): self.assertEquals(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0]) +@mitogen.core.takes_econtext +def do_detach(econtext): + econtext.detach() + while 1: + time.sleep(1) + logging.getLogger('mitogen').error('hi') + + +class DetachReapTest(testlib.RouterMixin, testlib.TestCase): + def test_subprocess_preserved_on_shutdown(self): + c1 = self.router.local() + pid = c1.call(os.getpid) + + l = mitogen.core.Latch() + mitogen.core.listen(c1, 'disconnect', l.put) + c1.call_no_reply(do_detach) + l.get() + + self.broker.shutdown() + self.broker.join() + + os.kill(pid, 0) # succeeds if process still alive + + # now clean up + os.kill(pid, signal.SIGTERM) + os.waitpid(pid, 0) + + if __name__ == '__main__': unittest2.main() diff --git a/tests/testlib.py b/tests/testlib.py index e26c6544..f856bfc1 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -192,7 +192,7 @@ def sync_with_broker(broker, timeout=10.0): """ sem = mitogen.core.Latch() broker.defer(sem.put, None) - sem.get(timeout=10.0) + sem.get(timeout=timeout) def log_fd_calls():