parent: zombie reaping v3

Improvements:

- Refactored off Process, separately testable without a connection
- Don't delay Broker shutdown indefinitely for detached children
pull/612/head
David Wilson 5 years ago
parent 709a0c013f
commit f4cee16526

@ -218,6 +218,10 @@ Child Implementation
Process Management Process Management
================== ==================
.. currentmodule:: mitogen.parent
.. autoclass:: Reaper
:members:
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autoclass:: Process .. autoclass:: Process
:members: :members:

@ -1364,7 +1364,22 @@ class Connection(object):
#: Prefix given to default names generated by :meth:`connect`. #: Prefix given to default names generated by :meth:`connect`.
name_prefix = u'local' name_prefix = u'local'
timer = None #: :class:`Timer` that runs :meth:`_on_timer_expired` when connection
#: timeout occurs.
_timer = None
#: When disconnection completes, instance of :class:`Reaper` used to wait
#: on the exit status of the subprocess.
_reaper = None
#: On failure, the exception object that should be propagated back to the
#: user.
exception = None
#: Extra text appended to :class:`EofError` if that exception is raised on
#: a failed connection attempt. May be used in subclasses to hint at common
#: problems with a particular connection method.
eof_error_hint = None
def __init__(self, options, router): def __init__(self, options, router):
#: :class:`Options` #: :class:`Options`
@ -1499,8 +1514,6 @@ class Connection(object):
msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args)) msg = 'Child start failed: %s. Command was: %s' % (e, Argv(args))
raise mitogen.core.StreamError(msg) raise mitogen.core.StreamError(msg)
eof_error_hint = None
def _adorn_eof_error(self, e): def _adorn_eof_error(self, e):
""" """
Subclasses may provide additional information in the case of a failed Subclasses may provide additional information in the case of a failed
@ -1509,10 +1522,8 @@ class Connection(object):
if self.eof_error_hint: if self.eof_error_hint:
e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),) e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),)
exception = None
def _complete_connection(self): def _complete_connection(self):
self.timer.cancel() self._timer.cancel()
if not self.exception: if not self.exception:
mitogen.core.unlisten(self._router.broker, 'shutdown', mitogen.core.unlisten(self._router.broker, 'shutdown',
self._on_broker_shutdown) self._on_broker_shutdown)
@ -1569,19 +1580,36 @@ class Connection(object):
def _on_streams_disconnected(self): def _on_streams_disconnected(self):
""" """
When disconnection has been detected for both our streams, cancel the When disconnection has been detected for both streams, cancel the
connection timer, mark the connection failed, and reap the child connection timer, mark the connection failed, and reap the child
process. Do nothing if the timer has already been cancelled, indicating process. Do nothing if the timer has already been cancelled, indicating
some existing failure has already been noticed. some existing failure has already been noticed.
""" """
if not self.timer.cancelled: if self._timer.active:
self.timer.cancel() self._timer.cancel()
self._fail_connection(EofError( self._fail_connection(EofError(
self.eof_error_msg + get_history( self.eof_error_msg + get_history(
[self.stdio_stream, self.stderr_stream] [self.stdio_stream, self.stderr_stream]
) )
)) ))
self.proc._async_reap(self, self._router)
if self._reaper:
return
self._reaper = Reaper(
broker=self._router.broker,
proc=self.proc,
kill=not (
(self.detached and self.child_is_immediate_subprocess) or
# Avoid killing so child has chance to write cProfile data
self._router.profiling
),
# Don't delay shutdown waiting for a detached child, since the
# detached child may expect to live indefinitely after its parent
# exited.
wait_on_shutdown=(not self.detached),
)
self._reaper.reap()
def _on_broker_shutdown(self): def _on_broker_shutdown(self):
""" """
@ -1590,20 +1618,6 @@ class Connection(object):
""" """
self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG)) self._fail_connection(CancelledError(BROKER_SHUTDOWN_MSG))
def _start_timer(self):
self.timer = self._router.broker.timers.schedule(
when=self.options.connect_deadline,
func=self._on_timer_expired,
)
def _on_timer_expired(self):
self._fail_connection(
mitogen.core.TimeoutError(
'Failed to setup connection after %.2f seconds',
self.options.connect_timeout,
)
)
def stream_factory(self): def stream_factory(self):
return self.stream_protocol_class.build_stream( return self.stream_protocol_class.build_stream(
broker=self._router.broker, broker=self._router.broker,
@ -1632,12 +1646,23 @@ class Connection(object):
self._router.broker.start_receive(stream) self._router.broker.start_receive(stream)
return stream return stream
def _on_timer_expired(self):
self._fail_connection(
mitogen.core.TimeoutError(
'Failed to setup connection after %.2f seconds',
self.options.connect_timeout,
)
)
def _async_connect(self): def _async_connect(self):
LOG.debug('creating connection to context %d using %s', LOG.debug('creating connection to context %d using %s',
self.context.context_id, self.__class__.__module__) self.context.context_id, self.__class__.__module__)
mitogen.core.listen(self._router.broker, 'shutdown', mitogen.core.listen(self._router.broker, 'shutdown',
self._on_broker_shutdown) self._on_broker_shutdown)
self._start_timer() self._timer = self._router.broker.timers.schedule(
when=self.options.connect_deadline,
func=self._on_timer_expired,
)
try: try:
self.proc = self.start_child() self.proc = self.start_child()
@ -2464,12 +2489,121 @@ class Router(mitogen.core.Router):
return self.connect(u'ssh', **kwargs) return self.connect(u'ssh', **kwargs)
class Process(object): class Reaper(object):
"""
Asynchronous logic for reaping :class:`Process` objects. This is necessary
to prevent uncontrolled buildup of zombie processes in long-lived parents
that will eventually reach an OS limit, preventing creation of new threads
and processes, and to log the exit status of the child in the case of an
error.
To avoid modifying process-global state such as with
:func:`signal.set_wakeup_fd` or installing a :data:`signal.SIGCHLD` handler
that might interfere with the user's ability to use those facilities,
Reaper polls for exit with backoff using timers installed on an associated
:class:`Broker`.
:param mitogen.core.Broker broker:
The :class:`Broker` on which to install timers
:param Process proc:
The process to reap.
:param bool kill:
If :data:`True`, send ``SIGTERM`` and ``SIGKILL`` to the process.
:param bool wait_on_shutdown:
If :data:`True`, delay :class:`Broker` shutdown if child has not yet
exited. If :data:`False` simply forget the child.
""" """
Process objects contain asynchronous logic for reaping children, and #: :class:`Timer` that invokes :meth:`reap` after some polling delay.
keeping track of their stdio descriptors. _timer = None
def __init__(self, broker, proc, kill, wait_on_shutdown):
self.broker = broker
self.proc = proc
self.kill = kill
self.wait_on_shutdown = wait_on_shutdown
self._tries = 0
def _signal_child(self, signum):
# For processes like sudo we cannot actually send sudo a signal,
# because it is setuid, so this is best-effort only.
LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum])
try:
os.kill(self.proc.pid, signum)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EPERM:
raise
This base class is extended by :class:`PopenProcess` and def _calc_delay(self, count):
"""
Calculate a poll delay given `count` attempts have already been made.
These constants have no principle, they just produce rapid but still
relatively conservative retries.
"""
delay = 0.05
for _ in xrange(count):
delay *= 1.72
return delay
def _on_broker_shutdown(self):
"""
Respond to :class:`Broker` shutdown by cancelling the reap timer if
:attr:`Router.await_children_at_shutdown` is disabled. Otherwise
shutdown is delayed for up to :attr:`Broker.shutdown_timeout` for
subprocesses may have no intention of exiting any time soon.
"""
if not self.wait_on_shutdown:
self._timer.cancel()
def _install_timer(self, delay):
new = self._timer is None
self._timer = self.broker.timers.schedule(
when=time.time() + delay,
func=self.reap,
)
if new:
mitogen.core.listen(self.broker, 'shutdown',
self._on_broker_shutdown)
def _remove_timer(self):
if self._timer and self._timer.active:
self._timer.cancel()
mitogen.core.unlisten(self.broker, 'shutdown',
self._on_broker_shutdown)
def reap(self):
"""
Reap the child process during disconnection.
"""
status = self.proc.poll()
if status is not None:
LOG.debug('%r: %s', self.proc, returncode_to_str(status))
self._remove_timer()
return
self._tries += 1
if self._tries > 20:
LOG.warning('%r: child will not exit, giving up', self)
self._remove_timer()
return
delay = self._calc_delay(self._tries - 1)
LOG.debug('%r still running after IO disconnect, recheck in %.03fs',
self.proc, delay)
self._install_timer(delay)
if not self.kill:
pass
elif self._tries == 1:
self._signal_child(signal.SIGTERM)
elif self._tries == 5: # roughly 4 seconds
self._signal_child(signal.SIGKILL)
class Process(object):
"""
Process objects provide a uniform interface to the :mod:`subprocess` and
:mod:`mitogen.fork`. This class is extended by :class:`PopenProcess` and
:class:`mitogen.fork.Process`. :class:`mitogen.fork.Process`.
:param int pid: :param int pid:
@ -2481,16 +2615,19 @@ class Process(object):
:param file stderr: :param file stderr:
File object attached to standard error, or :data:`None`. File object attached to standard error, or :data:`None`.
""" """
_delays = [0.05, 0.15, 0.3, 1.0, 5.0, 10.0] #: Name of the process used in logs. Set to the stream/context name by
#: :class:`Connection`.
name = None name = None
def __init__(self, pid, stdin, stdout, stderr=None): def __init__(self, pid, stdin, stdout, stderr=None):
#: The process ID.
self.pid = pid self.pid = pid
#: File object attached to standard input.
self.stdin = stdin self.stdin = stdin
#: File object attached to standard output.
self.stdout = stdout self.stdout = stdout
#: File object attached to standard error.
self.stderr = stderr self.stderr = stderr
self._returncode = None
self._reap_count = 0
def __repr__(self): def __repr__(self):
return '%s %s pid %d' % ( return '%s %s pid %d' % (
@ -2510,56 +2647,6 @@ class Process(object):
""" """
raise NotImplementedError() raise NotImplementedError()
def _signal_child(self, signum):
# For processes like sudo we cannot actually send sudo a signal,
# because it is setuid, so this is best-effort only.
LOG.debug('%r: child process still alive, sending %s',
self, SIGNAL_BY_NUM[signum])
try:
os.kill(self.pid, signum)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EPERM:
raise
def _async_reap(self, conn, router):
"""
Reap the child process during disconnection.
"""
if self._returncode is not None:
# on_disconnect() may be invoked more than once, for example, if
# there is still a pending message to be sent after the first
# on_disconnect() call.
return
if conn.detached and conn.child_is_immediate_subprocess:
LOG.debug('%r: immediate child is detached, won\'t reap it', self)
return
if router.profiling:
LOG.info('%r: wont kill child because profiling=True', self)
return
self._reap_count += 1
status = self.poll()
if status is not None:
LOG.debug('%r: %s', self, returncode_to_str(status))
return
i = self._reap_count - 1
if i >= len(self._delays):
LOG.warning('%r: child will not die, abandoning it', self)
return
elif i == 0:
self._signal_child(signal.SIGTERM)
elif i == 1:
self._signal_child(signal.SIGKILL)
router.broker.timers.schedule(
when=time.time() + self._delays[i],
func=lambda: self._async_reap(conn, router),
)
class PopenProcess(Process): class PopenProcess(Process):
""" """

@ -1,9 +1,10 @@
import time
import tempfile
import sys
import os import os
import signal
import sys
import tempfile
import threading import threading
import time
import unittest2 import unittest2
import testlib import testlib
@ -44,5 +45,33 @@ class ConnectionTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0]) self.assertEquals(mitogen.parent.BROKER_SHUTDOWN_MSG, exc.args[0])
@mitogen.core.takes_econtext
def do_detach(econtext):
econtext.detach()
while 1:
time.sleep(1)
logging.getLogger('mitogen').error('hi')
class DetachReapTest(testlib.RouterMixin, testlib.TestCase):
def test_subprocess_preserved_on_shutdown(self):
c1 = self.router.local()
pid = c1.call(os.getpid)
l = mitogen.core.Latch()
mitogen.core.listen(c1, 'disconnect', l.put)
c1.call_no_reply(do_detach)
l.get()
self.broker.shutdown()
self.broker.join()
os.kill(pid, 0) # succeeds if process still alive
# now clean up
os.kill(pid, signal.SIGTERM)
os.waitpid(pid, 0)
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

@ -192,7 +192,7 @@ def sync_with_broker(broker, timeout=10.0):
""" """
sem = mitogen.core.Latch() sem = mitogen.core.Latch()
broker.defer(sem.put, None) broker.defer(sem.put, None)
sem.get(timeout=10.0) sem.get(timeout=timeout)
def log_fd_calls(): def log_fd_calls():

Loading…
Cancel
Save