issue #155: parent: support Context.shutdown(), reap children on exit.

This permits graceful shutdown of individual contexts, without tearing
down everything.

Update mitogen.parent.Stream to also wait for the child to exit, to
prevent the buildup of zombie processes. This introduces a blocking wait
for process exit on the Broker thread, let's see if we can get away with
it. Chances are reasonable that it'll cause needless hangs on heavily
loaded machines.
pull/167/head
David Wilson 6 years ago
parent 6a74edce6b
commit 48351a1889

@ -740,6 +740,18 @@ Context Class
masters, and child contexts who later become parents. Currently when this
class is required, the target context's router is upgraded at runtime.
.. method:: shutdown (wait=False)
Arrange for the context to receive a ``SHUTDOWN`` message, triggering
graceful shutdown.
Due to a lack of support for timers, no attempt is made yet to force
terminate a hung context using this method. This will be fixed shortly.
:param bool wait:
If :py:data:`True`, block the calling thread until the context has
completely terminated.
.. method:: call_async (fn, \*args, \*\*kwargs)
Arrange for the context's ``CALL_FUNCTION`` handle to receive a

@ -308,6 +308,9 @@ class Stream(mitogen.core.Stream):
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
profiling = False
#: Set to the child's PID by connect().
pid = None
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
@ -351,6 +354,16 @@ class Stream(mitogen.core.Stream):
)
)
def on_disconnect(self, broker):
pid, status = os.waitpid(self.pid, os.WNOHANG)
if pid:
LOG.debug('%r: child process exit status was %d', self, status)
else:
LOG.debug('%r: child process still alive, sending SIGTERM', self)
os.kill(self.pid, signal.SIGTERM)
pid, status = os.waitpid(self.pid, 0)
super(Stream, self).on_disconnect(broker)
# Minimised, gzipped, base64'd and passed to 'python -c'. It forks, dups
# file descriptor 0 as 100, creates a pipe, then execs a new interpreter
# with a custom argv.
@ -425,8 +438,8 @@ class Stream(mitogen.core.Stream):
def connect(self):
LOG.debug('%r.connect()', self)
pid, fd = self.create_child(*self.get_boot_command())
self.name = 'local.%s' % (pid,)
self.pid, fd = self.create_child(*self.get_boot_command())
self.name = 'local.%s' % (self.pid,)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
@ -492,6 +505,20 @@ class Context(mitogen.core.Context):
receiver = self.call_async(fn, *args, **kwargs)
return receiver.get().unpickle(throw_dead=False)
def shutdown(self, wait=False):
LOG.debug('%r.shutdown() sending SHUTDOWN', self)
latch = mitogen.core.Latch()
mitogen.core.listen(self, 'disconnect', lambda: latch.put(None))
self.send(
mitogen.core.Message(
handle=mitogen.core.SHUTDOWN,
)
)
if wait:
latch.get()
class RouteMonitor(object):
def __init__(self, router, parent=None):

@ -1,3 +1,4 @@
import os
import subprocess
import time
@ -7,6 +8,14 @@ import mitogen.parent
import testlib
class ContextTest(testlib.RouterMixin, unittest2.TestCase):
def test_context_shutdown(self):
local = self.router.local()
pid = local.call(os.getpid)
local.shutdown(wait=True)
self.assertRaises(OSError, lambda: os.kill(pid, 0))
class IterReadTest(unittest2.TestCase):
func = staticmethod(mitogen.parent.iter_read)

Loading…
Cancel
Save