fork: support on_start= argument.

pull/255/head
David Wilson 8 years ago
parent 00edf0d66d
commit b0ce6eecd7

@ -576,7 +576,7 @@ Router Class
**Context Factories** **Context Factories**
.. method:: fork (new_stack=False, on_fork=None, debug=False, profiling=False, via=None) .. method:: fork (on_fork=None, on_start=None, debug=False, profiling=False, via=None)
Construct a context on the local machine by forking the current Construct a context on the local machine by forking the current
process. The forked child receives a new identity, sets up a new broker process. The forked child receives a new identity, sets up a new broker
@ -650,18 +650,19 @@ Router Class
The associated stream implementation is The associated stream implementation is
:py:class:`mitogen.fork.Stream`. :py:class:`mitogen.fork.Stream`.
:param bool new_stack:
If :py:data:`True`, arrange for the local thread stack to be
discarded, by forking from a new thread. Aside from clean
tracebacks, this has the effect of causing objects referenced by
the stack to cease existing in the child.
:param function on_fork: :param function on_fork:
Function invoked as `on_fork()` from within the child process. This Function invoked as `on_fork()` from within the child process. This
permits supplying a program-specific cleanup function to break permits supplying a program-specific cleanup function to break
locks and close file descriptors belonging to the parent from locks and close file descriptors belonging to the parent from
within the child. within the child.
:param function on_start:
Invoked as `on_start(econtext)` from within the child process after
it has been set up, but before the function dispatch loop starts.
This permits supplying a custom child main function that inherits
rich data structures that cannot normally be passed via a
serialization.
:param Context via: :param Context via:
Same as the `via` parameter for :py:meth:`local`. Same as the `via` parameter for :py:meth:`local`.

@ -1786,6 +1786,9 @@ class ExternalContext(object):
return fn(*args, **kwargs) return fn(*args, **kwargs)
def _dispatch_calls(self): def _dispatch_calls(self):
if self.config.get('on_start'):
self.config['on_start'](self)
for msg in self.recv: for msg in self.recv:
try: try:
msg.reply(self._dispatch_one(msg)) msg.reply(self._dispatch_one(msg))

@ -90,12 +90,14 @@ class Stream(mitogen.parent.Stream):
on_fork = None on_fork = None
def construct(self, old_router, max_message_size, on_fork=None, def construct(self, old_router, max_message_size, on_fork=None,
debug=False, profiling=False, unidirectional=False): debug=False, profiling=False, unidirectional=False,
on_start=None):
# fork method only supports a tiny subset of options. # fork method only supports a tiny subset of options.
super(Stream, self).construct(max_message_size=max_message_size, super(Stream, self).construct(max_message_size=max_message_size,
debug=debug, profiling=profiling, debug=debug, profiling=profiling,
unidirectional=False) unidirectional=False)
self.on_fork = on_fork self.on_fork = on_fork
self.on_start = on_start
responder = getattr(old_router, 'responder', None) responder = getattr(old_router, 'responder', None)
if isinstance(responder, mitogen.parent.ModuleForwarder): if isinstance(responder, mitogen.parent.ModuleForwarder):
@ -152,6 +154,8 @@ class Stream(mitogen.parent.Stream):
config['core_src_fd'] = None config['core_src_fd'] = None
config['importer'] = self.importer config['importer'] = self.importer
config['setup_package'] = False config['setup_package'] = False
if self.on_start:
config['on_start'] = self.on_start
try: try:
mitogen.core.ExternalContext(config).main() mitogen.core.ExternalContext(config).main()
finally: finally:

@ -70,6 +70,13 @@ class ForkTest(testlib.RouterMixin, unittest2.TestCase):
context = self.router.fork() context = self.router.fork()
self.assertEqual(2, context.call(exercise_importer, 1)) self.assertEqual(2, context.call(exercise_importer, 1))
def test_on_start(self):
recv = mitogen.core.Receiver(self.router)
def on_start(econtext):
sender = mitogen.core.Sender(econtext.parent, recv.handle)
sender.send(123)
context = self.router.fork(on_start=on_start)
self.assertEquals(123, recv.get().unpickle())
class DoubleChildTest(testlib.RouterMixin, unittest2.TestCase): class DoubleChildTest(testlib.RouterMixin, unittest2.TestCase):
def test_okay(self): def test_okay(self):

Loading…
Cancel
Save