diff --git a/docs/api.rst b/docs/api.rst index 56bb5c1f..4ad1db5e 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -833,12 +833,21 @@ Context Class .. currentmodule:: mitogen.parent +.. autoclass:: CallChain + :members: + .. class:: Context Extend :class:`mitogen.core.Router` with functionality useful to masters, and child contexts who later become parents. Currently when this class is required, the target context's router is upgraded at runtime. + .. attribute:: default_call_chain + + A :class:`CallChain` instance constructed by default, with pipelining + disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` + use this instance. + .. method:: shutdown (wait=False) Arrange for the context to receive a ``SHUTDOWN`` message, triggering @@ -858,130 +867,15 @@ Context Class .. method:: call_async (fn, \*args, \*\*kwargs) - Arrange for the context's ``CALL_FUNCTION`` handle to receive a - message that causes `fn(\*args, \**kwargs)` to be invoked on the - context's main thread. - - :param fn: - A free function in module scope or a class method of a class - directly reachable from module scope: - - .. code-block:: python - - # mymodule.py - - def my_func(): - """A free function reachable as mymodule.my_func""" - - class MyClass: - @classmethod - def my_classmethod(cls): - """Reachable as mymodule.MyClass.my_classmethod""" - - def my_instancemethod(self): - """Unreachable: requires a class instance!""" - - class MyEmbeddedClass: - @classmethod - def my_classmethod(cls): - """Not directly reachable from module scope!""" - - :param tuple args: - Function arguments, if any. See :ref:`serialization-rules` for - permitted types. - :param dict kwargs: - Function keyword arguments, if any. See :ref:`serialization-rules` - for permitted types. - :param str mitogen_chain: - Optional cancellation key for threading unrelated asynchronous - requests to one context. If any prior call in the chain raised an - exception, subsequent calls with the same key immediately produce - the same exception. - - This permits a sequence of :meth:`no-reply ` or - pipelined asynchronous calls to be made without wasting network - round-trips to discover if prior calls succeeded, while allowing - such chains to overlap concurrently from multiple unrelated source - contexts. The chain is cancelled on first exception, enabling - patterns like:: - - # Must be distinct for each overlapping sequence, and cannot be - # reused. - chain = 'make-dirs-and-do-stuff-%s-%s-%s-%s' % ( - socket.gethostname(), - os.getpid(), - threading.currentThread().id, - time.time(), - ) - context.call_no_reply(os.mkdir, '/tmp/foo', - mitogen_chain=chain) - - # If os.mkdir() fails, this never runs: - context.call_no_reply(os.mkdir, '/tmp/foo/bar', - mitogen_chain=chain) - - # If either os.mkdir() fails, this never runs, and returns the - # exception. - recv = context.call_async(subprocess.check_output, '/tmp/foo', - mitogen_chain=chain) - - # If os.mkdir() or check_call() failed, this never runs, and - # the exception that occurred is raised. - context.call(do_something, mitogen_chain=chain) - - # The receiver also got a copy of the exception, so if this - # code was executed, the exception would also be raised. - if recv.get().unpickle() == 'baz': - pass - - It is necessary to explicitly clean up the chain history on a - target, otherwise unbounded memory usage is possible. See - :meth:`forget_chain`. - - :returns: - :class:`mitogen.core.Receiver` configured to receive the result - of the invocation: - - .. code-block:: python - - recv = context.call_async(os.check_output, 'ls /tmp/') - try: - # Prints output once it is received. - msg = recv.get() - print(msg.unpickle()) - except mitogen.core.CallError, e: - print('Call failed:', str(e)) - - Asynchronous calls may be dispatched in parallel to multiple - contexts and consumed as they complete using - :class:`mitogen.select.Select`. + See :meth:`CallChain.call_async`. .. method:: call (fn, \*args, \*\*kwargs) - Equivalent to :meth:`call_async(fn, \*args, \**kwargs).get().unpickle() - `. - - :returns: - The function's return value. - - :raises mitogen.core.CallError: - An exception was raised in the remote context during execution. + See :meth:`CallChain.call`. .. method:: call_no_reply (fn, \*args, \*\*kwargs) - Like :meth:`call_async`, but do not wait for a return value, and inform - the target context no such reply is expected. If the call fails, the - full exception will be logged to the target context's logging - framework, unless the `mitogen_chain` argument was present. - - :raises mitogen.core.CallError: - An exception was raised in the remote context during execution. - - .. method:: forget_chain (chain_id) - - Instruct the target to forget any exception related to `chain_id`, a - key previously used as the `mitogen_chain` parameter to - :meth:`call_async`. + See :meth:`CallChain.call_no_reply`. Receiver Class diff --git a/docs/howitworks.rst b/docs/howitworks.rst index 1d45647f..1e3d2768 100644 --- a/docs/howitworks.rst +++ b/docs/howitworks.rst @@ -373,11 +373,9 @@ Children listen on the following handles: .. currentmodule:: mitogen.core .. data:: CALL_FUNCTION - Receives `(mod_name, class_name, func_name, args, kwargs)` - 5-tuples from - :py:meth:`call_async() `, - imports ``mod_name``, then attempts to execute - `class_name.func_name(\*args, \**kwargs)`. + Receives `(chain_id, mod_name, class_name, func_name, args, kwargs)` + 6-tuples from :class:`mitogen.parent.CallChain`, imports ``mod_name``, then + attempts to execute `class_name.func_name(\*args, \**kwargs)`. When this channel is closed (by way of receiving a dead message), the child's main thread begins graceful shutdown of its own :py:class:`Broker` diff --git a/mitogen/core.py b/mitogen/core.py index 0142be7c..07cfcf37 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1968,7 +1968,7 @@ class Dispatcher(object): data = msg.unpickle(throw=False) _v and LOG.debug('_dispatch_one(%r)', data) - modname, klass, func, args, kwargs = data + chain_id, modname, klass, func, args, kwargs = data obj = import_module(modname) if klass: obj = getattr(obj, klass) @@ -1978,15 +1978,14 @@ class Dispatcher(object): if getattr(fn, 'mitogen_takes_router', None): kwargs.setdefault('router', self.econtext.router) - return fn, args, kwargs + return chain_id, fn, args, kwargs def _dispatch_one(self, msg): try: - fn, args, kwargs = self._parse_request(msg) + chain_id, fn, args, kwargs = self._parse_request(msg) except Exception: return None, CallError(sys.exc_info()[1]) - chain_id = kwargs.pop('mitogen_chain', None) if chain_id in self._error_by_chain_id: return chain_id, self._error_by_chain_id[chain_id] diff --git a/mitogen/parent.py b/mitogen/parent.py index 29decbc1..556afb22 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -523,22 +523,6 @@ def upgrade_router(econtext): ) -def make_call_msg(fn, *args, **kwargs): - if inspect.ismethod(fn) and inspect.isclass(fn.__self__): - klass = mitogen.core.to_text(fn.__self__.__name__) - else: - klass = None - - tup = ( - mitogen.core.to_text(fn.__module__), - klass, - mitogen.core.to_text(fn.__name__), - args, - mitogen.core.Kwargs(kwargs) - ) - return mitogen.core.Message.pickled(tup, handle=mitogen.core.CALL_FUNCTION) - - def stream_by_method_name(name): """ Given the name of a Mitogen connection method, import its implementation @@ -1137,9 +1121,216 @@ class ChildIdAllocator(object): return self.allocate() +class CallChain(object): + """ + Construct :data:`mitogen.core.CALL_FUNCTION` messages and deliver them to a + target context, optionally threading related calls such that an exception + in an earlier call cancels subsequent calls. + + :param mitogen.core.Context context: + Target context. + :param bool pipelined: + Enable pipelined mode. + + By default, :meth:`call`, :meth:`call_no_reply` and :meth:`call_async` + issue calls and produce responses, with no memory of prior exceptions. If a + call made with :meth:`call_no_reply` fails, the traceback is logged to the + target context's logging framework. + + **Pipelined Mode** + + When `pipelined=True`, if an exception occurs in a call, + all subsequent calls made by the same :class:`CallChain` instance will fail + with the same exception, including those already in-flight on the network, + and no further calls will execute until :meth:`reset` is invoked. + + No traceback is logged for calls made with :meth:`call_no_reply`, instead + the exception is saved and reported as the result of subsequent + :meth:`call` or :meth:`call_async` calls. + + A sequence of pipelined asynchronous calls can be made without wasting + network round-trips to discover if prior calls succeeded, while allowing + such chains to overlap concurrently at a target context from multiple + unrelated source contexts. This enables many calls to safely progress in + one network round-trip, like:: + + chain = mitogen.parent.CallChain(context, pipelined=True) + chain.call_no_reply(os.mkdir, '/tmp/foo') + + # If previous mkdir() failed, this never runs: + chain.call_no_reply(os.mkdir, '/tmp/foo/bar') + + # If either mkdir() failed, this never runs, and returns the exception. + recv = chain.call_async(subprocess.check_output, '/tmp/foo') + + # If mkdir() or check_call() failed, this never runs, and returns the + # exception. + chain.call(do_something) + + # The receiver also got a copy of the exception, so if this + # code was executed, the exception would also be raised. + if recv.get().unpickle() == 'baz': + pass + + When pipelining is enabled, :meth:`reset` must be called to ensure the last + exception is discarded, otherwise unbounded memory usage is possible in + long-running programs. :class:`CallChain` supports the context manager + protocol to ensure :meth:`reset` is always invoked:: + + with mitogen.parent.CallChain(context, pipelined=True) as chain: + chain.call_no_reply(...) + chain.call_no_reply(...) + chain.call_no_reply(...) + chain.call(...) + + # chain.reset() automatically invoked. + """ + def __init__(self, context, pipelined=False): + self.context = context + if pipelined: + self.chain_id = self.make_chain_id() + else: + self.chain_id = None + + @classmethod + def make_chain_id(cls): + return '%s-%s-%s-%s' % ( + socket.gethostname(), + os.getpid(), + threading.currentThread().ident, + int(1e6 * time.time()), + ) + + def __enter__(self): + return self + + def __exit__(self, _1, _2, _3): + self.reset() + + def reset(self): + """ + Instruct the target to forget any related exception. + """ + if not self.chain_id: + return + + saved, self.chain_id = self.chain_id, None + try: + self.call_no_reply(mitogen.core.Dispatcher.forget_chain, saved) + finally: + self.chain_id = saved + + def make_msg(self, fn, *args, **kwargs): + if inspect.ismethod(fn) and inspect.isclass(fn.__self__): + klass = mitogen.core.to_text(fn.__self__.__name__) + else: + klass = None + + tup = ( + self.chain_id, + mitogen.core.to_text(fn.__module__), + klass, + mitogen.core.to_text(fn.__name__), + args, + mitogen.core.Kwargs(kwargs) + ) + return mitogen.core.Message.pickled(tup, + handle=mitogen.core.CALL_FUNCTION) + + def call_no_reply(self, fn, *args, **kwargs): + """ + Like :meth:`call_async`, but do not wait for a return value, and inform + the target context no such reply is expected. If the call fails and + pipelining is disabled, the full exception will be logged to the target + context's logging framework. + + :raises mitogen.core.CallError: + An exception was raised in the remote context during execution. + """ + LOG.debug('%r.call_no_reply(%r, *%r, **%r)', + self, fn, args, kwargs) + self.context.send(self.make_msg(fn, *args, **kwargs)) + + def call_async(self, fn, *args, **kwargs): + """ + Arrange for the context's ``CALL_FUNCTION`` handle to receive a message + that causes `fn(\*args, \**kwargs)` to be invoked on the context's main + thread. + + :param fn: + A free function in module scope or a class method of a class + directly reachable from module scope: + + .. code-block:: python + + # mymodule.py + + def my_func(): + '''A free function reachable as mymodule.my_func''' + + class MyClass: + @classmethod + def my_classmethod(cls): + '''Reachable as mymodule.MyClass.my_classmethod''' + + def my_instancemethod(self): + '''Unreachable: requires a class instance!''' + + class MyEmbeddedClass: + @classmethod + def my_classmethod(cls): + '''Not directly reachable from module scope!''' + + :param tuple args: + Function arguments, if any. See :ref:`serialization-rules` for + permitted types. + :param dict kwargs: + Function keyword arguments, if any. See :ref:`serialization-rules` + for permitted types. + :returns: + :class:`mitogen.core.Receiver` configured to receive the result of + the invocation: + + .. code-block:: python + + recv = context.call_async(os.check_output, 'ls /tmp/') + try: + # Prints output once it is received. + msg = recv.get() + print(msg.unpickle()) + except mitogen.core.CallError, e: + print('Call failed:', str(e)) + + Asynchronous calls may be dispatched in parallel to multiple + contexts and consumed as they complete using + :class:`mitogen.select.Select`. + """ + LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs)) + return self.context.send_async(self.make_msg(fn, *args, **kwargs)) + + def call(self, fn, *args, **kwargs): + """ + Equivalent to :meth:`call_async(fn, \*args, \**kwargs).get().unpickle() + `. + + :returns: + The function's return value. + + :raises mitogen.core.CallError: + An exception was raised in the remote context during execution. + """ + receiver = self.call_async(fn, *args, **kwargs) + return receiver.get().unpickle(throw_dead=False) + + class Context(mitogen.core.Context): + call_chain_class = CallChain via = None + def __init__(self, *args, **kwargs): + super(Context, self).__init__(*args, **kwargs) + self.default_call_chain = self.call_chain_class(self) + def __eq__(self, other): return (isinstance(other, mitogen.core.Context) and (other.context_id == self.context_id) and @@ -1149,20 +1340,13 @@ class Context(mitogen.core.Context): return hash((self.router, self.context_id)) def call_async(self, fn, *args, **kwargs): - LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs)) - return self.send_async(make_call_msg(fn, *args, **kwargs)) + return self.default_call_chain.call_async(fn, *args, **kwargs) def call(self, fn, *args, **kwargs): - receiver = self.call_async(fn, *args, **kwargs) - return receiver.get().unpickle(throw_dead=False) + return self.default_call_chain.call(fn, *args, **kwargs) def call_no_reply(self, fn, *args, **kwargs): - LOG.debug('%r.call_no_reply(%r, *%r, **%r)', - self, fn, args, kwargs) - self.send(make_call_msg(fn, *args, **kwargs)) - - def forget_chain(self, chain_id): - self.call_no_reply(mitogen.core.Dispatcher.forget_chain, chain_id) + self.default_call_chain.call_no_reply(fn, *args, **kwargs) def shutdown(self, wait=False): LOG.debug('%r.shutdown() sending SHUTDOWN', self) diff --git a/tests/call_function_test.py b/tests/call_function_test.py index ca56f07a..dc9a2298 100644 --- a/tests/call_function_test.py +++ b/tests/call_function_test.py @@ -4,6 +4,7 @@ import time import unittest2 import mitogen.core +import mitogen.parent import mitogen.master import testlib @@ -120,36 +121,37 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase): class ChainTest(testlib.RouterMixin, testlib.TestCase): # Verify mitogen_chain functionality. + klass = mitogen.parent.CallChain def setUp(self): super(ChainTest, self).setUp() self.local = self.router.fork() def test_subsequent_calls_produce_same_error(self): - self.assertEquals('xx', - self.local.call(func_returns_arg, 'xx', mitogen_chain='c1')) - self.local.call_no_reply(function_that_fails, 'x1', mitogen_chain='c1') + chain = self.klass(self.local, pipelined=True) + self.assertEquals('xx', chain.call(func_returns_arg, 'xx')) + chain.call_no_reply(function_that_fails, 'x1') e1 = self.assertRaises(mitogen.core.CallError, - lambda: self.local.call(function_that_fails, 'x2', mitogen_chain='c1')) + lambda: chain.call(function_that_fails, 'x2')) e2 = self.assertRaises(mitogen.core.CallError, - lambda: self.local.call(func_returns_arg, 'x3', mitogen_chain='c1')) + lambda: chain.call(func_returns_arg, 'x3')) self.assertEquals(str(e1), str(e2)) def test_unrelated_overlapping_failed_chains(self): - self.local.call_no_reply(function_that_fails, 'c1', mitogen_chain='c1') - self.assertEquals('yes', - self.local.call(func_returns_arg, 'yes', mitogen_chain='c2')) + c1 = self.klass(self.local, pipelined=True) + c2 = self.klass(self.local, pipelined=True) + c1.call_no_reply(function_that_fails, 'c1') + self.assertEquals('yes', c2.call(func_returns_arg, 'yes')) self.assertRaises(mitogen.core.CallError, - lambda: self.local.call(func_returns_arg, 'yes', mitogen_chain='c1')) - self.local.call_no_reply(function_that_fails, 'c2', mitogen_chain='c2') + lambda: c1.call(func_returns_arg, 'yes')) - def test_forget(self): - self.local.call_no_reply(function_that_fails, 'x1', mitogen_chain='c1') + def test_reset(self): + c1 = self.klass(self.local, pipelined=True) + c1.call_no_reply(function_that_fails, 'x1') e1 = self.assertRaises(mitogen.core.CallError, - lambda: self.local.call(function_that_fails, 'x2', mitogen_chain='c1')) - self.local.forget_chain('c1') - self.assertEquals('x3', - self.local.call(func_returns_arg, 'x3', mitogen_chain='c1')) + lambda: c1.call(function_that_fails, 'x2')) + c1.reset() + self.assertEquals('x3', c1.call(func_returns_arg, 'x3')) if __name__ == '__main__':