core: support mitogen_chain dispatcher option.

pull/372/head
David Wilson 6 years ago
parent 92c092d27b
commit 42b1b3d286

@ -892,6 +892,52 @@ Context Class
:param dict kwargs: :param dict kwargs:
Function keyword arguments, if any. See :ref:`serialization-rules` Function keyword arguments, if any. See :ref:`serialization-rules`
for permitted types. for permitted types.
:param str mitogen_chain:
Optional cancellation key for threading unrelated asynchronous
requests to one context. If any prior call in the chain raised an
exception, subsequent calls with the same key immediately produce
the same exception.
This permits a sequence of :meth:`no-reply <call_no_reply>` or
pipelined asynchronous calls to be made without wasting network
round-trips to discover if prior calls succeeded, while allowing
such chains to overlap concurrently from multiple unrelated source
contexts. The chain is cancelled on first exception, enabling
patterns like::
# Must be distinct for each overlapping sequence, and cannot be
# reused.
chain = 'make-dirs-and-do-stuff-%s-%s-%s-%s' % (
socket.gethostname(),
os.getpid(),
threading.currentThread().id,
time.time(),
)
context.call_no_reply(os.mkdir, '/tmp/foo',
mitogen_chain=chain)
# If os.mkdir() fails, this never runs:
context.call_no_reply(os.mkdir, '/tmp/foo/bar',
mitogen_chain=chain)
# If either os.mkdir() fails, this never runs, and returns the
# exception.
recv = context.call_async(subprocess.check_output, '/tmp/foo',
mitogen_chain=chain)
# If os.mkdir() or check_call() failed, this never runs, and
# the exception that occurred is raised.
context.call(do_something, mitogen_chain=chain)
# The receiver also got a copy of the exception, so if this
# code was executed, the exception would also be raised.
if recv.get().unpickle() == 'baz':
pass
Note that for long-lived programs, there is presently no mechanism
for clearing the chain history on a target. This will be addressed
in future.
:returns: :returns:
:class:`mitogen.core.Receiver` configured to receive the result :class:`mitogen.core.Receiver` configured to receive the result
of the invocation: of the invocation:
@ -923,8 +969,10 @@ Context Class
.. method:: call_no_reply (fn, \*args, \*\*kwargs) .. method:: call_no_reply (fn, \*args, \*\*kwargs)
Send a function call, but expect no return value. If the call fails, Like :meth:`call_async`, but do not wait for a return value, and inform
the full exception will be logged to the target context's logging framework. the target context no such reply is expected. If the call fails, the
full exception will be logged to the target context's logging
framework, unless the `mitogen_chain` argument was present.
:raises mitogen.core.CallError: :raises mitogen.core.CallError:
An exception was raised in the remote context during execution. An exception was raised in the remote context during execution.

@ -1952,15 +1952,14 @@ class Broker(object):
class Dispatcher(object): class Dispatcher(object):
def __init__(self, econtext): def __init__(self, econtext):
self.econtext = econtext self.econtext = econtext
#: Chain ID -> CallError if prior call failed.
self._error_by_chain_id = {}
self.recv = Receiver(router=econtext.router, self.recv = Receiver(router=econtext.router,
handle=CALL_FUNCTION, handle=CALL_FUNCTION,
policy=has_parent_authority) policy=has_parent_authority)
listen(econtext.broker, 'shutdown', self._on_broker_shutdown) listen(econtext.broker, 'shutdown', self.recv.close)
def _on_broker_shutdown(self): def _parse_request(self, msg):
self.recv.close()
def _dispatch_one(self, msg):
data = msg.unpickle(throw=False) data = msg.unpickle(throw=False)
_v and LOG.debug('_dispatch_one(%r)', data) _v and LOG.debug('_dispatch_one(%r)', data)
@ -1973,22 +1972,35 @@ class Dispatcher(object):
kwargs.setdefault('econtext', self.econtext) kwargs.setdefault('econtext', self.econtext)
if getattr(fn, 'mitogen_takes_router', None): if getattr(fn, 'mitogen_takes_router', None):
kwargs.setdefault('router', self.econtext.router) kwargs.setdefault('router', self.econtext.router)
return fn(*args, **kwargs)
return fn, args, kwargs
def _dispatch_one(self, msg):
try:
fn, args, kwargs = self._parse_request(msg)
except Exception:
return None, CallError(sys.exc_info()[1])
chain_id = kwargs.pop('mitogen_chain', None)
if chain_id in self._error_by_chain_id:
return chain_id, self._error_by_chain_id[chain_id]
try:
return chain_id, fn(*args, **kwargs)
except Exception:
e = CallError(sys.exc_info()[1])
if chain_id is not None:
self._error_by_chain_id[chain_id] = e
return chain_id, e
def _dispatch_calls(self): def _dispatch_calls(self):
for msg in self.recv: for msg in self.recv:
try: chain_id, ret = self._dispatch_one(msg)
ret = self._dispatch_one(msg) _v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret)
_v and LOG.debug('_dispatch_calls: %r -> %r', msg, ret) if msg.reply_to:
if msg.reply_to: msg.reply(ret)
msg.reply(ret) elif isinstance(ret, CallError) and chain_id is None:
except Exception: LOG.error('No-reply function call failed: %s', ret)
e = sys.exc_info()[1]
if msg.reply_to:
_v and LOG.debug('_dispatch_calls: %s', e)
msg.reply(CallError(e))
else:
LOG.exception('_dispatch_calls: %r', msg)
def run(self): def run(self):
if self.econtext.config.get('on_start'): if self.econtext.config.get('on_start'):

@ -18,15 +18,15 @@ def function_that_adds_numbers(x, y):
return x + y return x + y
def function_that_fails(): def function_that_fails(s=''):
raise plain_old_module.MyError('exception text') raise plain_old_module.MyError('exception text'+s)
def func_with_bad_return_value(): def func_with_bad_return_value():
return CrazyType() return CrazyType()
def func_accepts_returns_context(context): def func_returns_arg(context):
return context return context
@ -101,7 +101,7 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg) self.assertEquals(exc.args[0], mitogen.core.ChannelError.local_msg)
def test_accepts_returns_context(self): def test_accepts_returns_context(self):
context = self.local.call(func_accepts_returns_context, self.local) context = self.local.call(func_returns_arg, self.local)
self.assertIsNot(context, self.local) self.assertIsNot(context, self.local)
self.assertEqual(context.context_id, self.local.context_id) self.assertEqual(context.context_id, self.local.context_id)
self.assertEqual(context.name, self.local.name) self.assertEqual(context.name, self.local.name)
@ -118,5 +118,31 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
lambda: recv.get().unpickle()) lambda: recv.get().unpickle())
class ChainTest(testlib.RouterMixin, testlib.TestCase):
# Verify mitogen_chain functionality.
def setUp(self):
super(ChainTest, self).setUp()
self.local = self.router.fork()
def test_subsequent_calls_produce_same_error(self):
self.assertEquals('xx',
self.local.call(func_returns_arg, 'xx', mitogen_chain='c1'))
self.local.call_no_reply(function_that_fails, 'x1', mitogen_chain='c1')
e1 = self.assertRaises(mitogen.core.CallError,
lambda: self.local.call(function_that_fails, 'x2', mitogen_chain='c1'))
e2 = self.assertRaises(mitogen.core.CallError,
lambda: self.local.call(func_returns_arg, 'x3', mitogen_chain='c1'))
self.assertEquals(str(e1), str(e2))
def test_unrelated_overlapping_failed_chains(self):
self.local.call_no_reply(function_that_fails, 'c1', mitogen_chain='c1')
self.assertEquals('yes',
self.local.call(func_returns_arg, 'yes', mitogen_chain='c2'))
self.assertRaises(mitogen.core.CallError,
lambda: self.local.call(func_returns_arg, 'yes', mitogen_chain='c1'))
self.local.call_no_reply(function_that_fails, 'c2', mitogen_chain='c2')
if __name__ == '__main__': if __name__ == '__main__':
unittest2.main() unittest2.main()

Loading…
Cancel
Save