core: call chains v3: abstract it into a new CallChain class.

pull/372/head
David Wilson 6 years ago
parent b254eb3399
commit 4d3873c784

@ -833,12 +833,21 @@ Context Class
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
.. autoclass:: CallChain
:members:
.. class:: Context .. class:: Context
Extend :class:`mitogen.core.Router` with functionality useful to Extend :class:`mitogen.core.Router` with functionality useful to
masters, and child contexts who later become parents. Currently when this masters, and child contexts who later become parents. Currently when this
class is required, the target context's router is upgraded at runtime. class is required, the target context's router is upgraded at runtime.
.. attribute:: default_call_chain
A :class:`CallChain` instance constructed by default, with pipelining
disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply`
use this instance.
.. method:: shutdown (wait=False) .. method:: shutdown (wait=False)
Arrange for the context to receive a ``SHUTDOWN`` message, triggering Arrange for the context to receive a ``SHUTDOWN`` message, triggering
@ -858,130 +867,15 @@ Context Class
.. method:: call_async (fn, \*args, \*\*kwargs) .. method:: call_async (fn, \*args, \*\*kwargs)
Arrange for the context's ``CALL_FUNCTION`` handle to receive a See :meth:`CallChain.call_async`.
message that causes `fn(\*args, \**kwargs)` to be invoked on the
context's main thread.
:param fn:
A free function in module scope or a class method of a class
directly reachable from module scope:
.. code-block:: python
# mymodule.py
def my_func():
"""A free function reachable as mymodule.my_func"""
class MyClass:
@classmethod
def my_classmethod(cls):
"""Reachable as mymodule.MyClass.my_classmethod"""
def my_instancemethod(self):
"""Unreachable: requires a class instance!"""
class MyEmbeddedClass:
@classmethod
def my_classmethod(cls):
"""Not directly reachable from module scope!"""
:param tuple args:
Function arguments, if any. See :ref:`serialization-rules` for
permitted types.
:param dict kwargs:
Function keyword arguments, if any. See :ref:`serialization-rules`
for permitted types.
:param str mitogen_chain:
Optional cancellation key for threading unrelated asynchronous
requests to one context. If any prior call in the chain raised an
exception, subsequent calls with the same key immediately produce
the same exception.
This permits a sequence of :meth:`no-reply <call_no_reply>` or
pipelined asynchronous calls to be made without wasting network
round-trips to discover if prior calls succeeded, while allowing
such chains to overlap concurrently from multiple unrelated source
contexts. The chain is cancelled on first exception, enabling
patterns like::
# Must be distinct for each overlapping sequence, and cannot be
# reused.
chain = 'make-dirs-and-do-stuff-%s-%s-%s-%s' % (
socket.gethostname(),
os.getpid(),
threading.currentThread().id,
time.time(),
)
context.call_no_reply(os.mkdir, '/tmp/foo',
mitogen_chain=chain)
# If os.mkdir() fails, this never runs:
context.call_no_reply(os.mkdir, '/tmp/foo/bar',
mitogen_chain=chain)
# If either os.mkdir() fails, this never runs, and returns the
# exception.
recv = context.call_async(subprocess.check_output, '/tmp/foo',
mitogen_chain=chain)
# If os.mkdir() or check_call() failed, this never runs, and
# the exception that occurred is raised.
context.call(do_something, mitogen_chain=chain)
# The receiver also got a copy of the exception, so if this
# code was executed, the exception would also be raised.
if recv.get().unpickle() == 'baz':
pass
It is necessary to explicitly clean up the chain history on a
target, otherwise unbounded memory usage is possible. See
:meth:`forget_chain`.
:returns:
:class:`mitogen.core.Receiver` configured to receive the result
of the invocation:
.. code-block:: python
recv = context.call_async(os.check_output, 'ls /tmp/')
try:
# Prints output once it is received.
msg = recv.get()
print(msg.unpickle())
except mitogen.core.CallError, e:
print('Call failed:', str(e))
Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using
:class:`mitogen.select.Select`.
.. method:: call (fn, \*args, \*\*kwargs) .. method:: call (fn, \*args, \*\*kwargs)
Equivalent to :meth:`call_async(fn, \*args, \**kwargs).get().unpickle() See :meth:`CallChain.call`.
<call_async>`.
:returns:
The function's return value.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
.. method:: call_no_reply (fn, \*args, \*\*kwargs) .. method:: call_no_reply (fn, \*args, \*\*kwargs)
Like :meth:`call_async`, but do not wait for a return value, and inform See :meth:`CallChain.call_no_reply`.
the target context no such reply is expected. If the call fails, the
full exception will be logged to the target context's logging
framework, unless the `mitogen_chain` argument was present.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
.. method:: forget_chain (chain_id)
Instruct the target to forget any exception related to `chain_id`, a
key previously used as the `mitogen_chain` parameter to
:meth:`call_async`.
Receiver Class Receiver Class

@ -373,11 +373,9 @@ Children listen on the following handles:
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. data:: CALL_FUNCTION .. data:: CALL_FUNCTION
Receives `(mod_name, class_name, func_name, args, kwargs)` Receives `(chain_id, mod_name, class_name, func_name, args, kwargs)`
5-tuples from 6-tuples from :class:`mitogen.parent.CallChain`, imports ``mod_name``, then
:py:meth:`call_async() <mitogen.parent.Context.call_async>`, attempts to execute `class_name.func_name(\*args, \**kwargs)`.
imports ``mod_name``, then attempts to execute
`class_name.func_name(\*args, \**kwargs)`.
When this channel is closed (by way of receiving a dead message), the When this channel is closed (by way of receiving a dead message), the
child's main thread begins graceful shutdown of its own :py:class:`Broker` child's main thread begins graceful shutdown of its own :py:class:`Broker`

@ -1968,7 +1968,7 @@ class Dispatcher(object):
data = msg.unpickle(throw=False) data = msg.unpickle(throw=False)
_v and LOG.debug('_dispatch_one(%r)', data) _v and LOG.debug('_dispatch_one(%r)', data)
modname, klass, func, args, kwargs = data chain_id, modname, klass, func, args, kwargs = data
obj = import_module(modname) obj = import_module(modname)
if klass: if klass:
obj = getattr(obj, klass) obj = getattr(obj, klass)
@ -1978,15 +1978,14 @@ class Dispatcher(object):
if getattr(fn, 'mitogen_takes_router', None): if getattr(fn, 'mitogen_takes_router', None):
kwargs.setdefault('router', self.econtext.router) kwargs.setdefault('router', self.econtext.router)
return fn, args, kwargs return chain_id, fn, args, kwargs
def _dispatch_one(self, msg): def _dispatch_one(self, msg):
try: try:
fn, args, kwargs = self._parse_request(msg) chain_id, fn, args, kwargs = self._parse_request(msg)
except Exception: except Exception:
return None, CallError(sys.exc_info()[1]) return None, CallError(sys.exc_info()[1])
chain_id = kwargs.pop('mitogen_chain', None)
if chain_id in self._error_by_chain_id: if chain_id in self._error_by_chain_id:
return chain_id, self._error_by_chain_id[chain_id] return chain_id, self._error_by_chain_id[chain_id]

@ -523,22 +523,6 @@ def upgrade_router(econtext):
) )
def make_call_msg(fn, *args, **kwargs):
if inspect.ismethod(fn) and inspect.isclass(fn.__self__):
klass = mitogen.core.to_text(fn.__self__.__name__)
else:
klass = None
tup = (
mitogen.core.to_text(fn.__module__),
klass,
mitogen.core.to_text(fn.__name__),
args,
mitogen.core.Kwargs(kwargs)
)
return mitogen.core.Message.pickled(tup, handle=mitogen.core.CALL_FUNCTION)
def stream_by_method_name(name): def stream_by_method_name(name):
""" """
Given the name of a Mitogen connection method, import its implementation Given the name of a Mitogen connection method, import its implementation
@ -1137,9 +1121,216 @@ class ChildIdAllocator(object):
return self.allocate() return self.allocate()
class CallChain(object):
"""
Construct :data:`mitogen.core.CALL_FUNCTION` messages and deliver them to a
target context, optionally threading related calls such that an exception
in an earlier call cancels subsequent calls.
:param mitogen.core.Context context:
Target context.
:param bool pipelined:
Enable pipelined mode.
By default, :meth:`call`, :meth:`call_no_reply` and :meth:`call_async`
issue calls and produce responses, with no memory of prior exceptions. If a
call made with :meth:`call_no_reply` fails, the traceback is logged to the
target context's logging framework.
**Pipelined Mode**
When `pipelined=True`, if an exception occurs in a call,
all subsequent calls made by the same :class:`CallChain` instance will fail
with the same exception, including those already in-flight on the network,
and no further calls will execute until :meth:`reset` is invoked.
No traceback is logged for calls made with :meth:`call_no_reply`, instead
the exception is saved and reported as the result of subsequent
:meth:`call` or :meth:`call_async` calls.
A sequence of pipelined asynchronous calls can be made without wasting
network round-trips to discover if prior calls succeeded, while allowing
such chains to overlap concurrently at a target context from multiple
unrelated source contexts. This enables many calls to safely progress in
one network round-trip, like::
chain = mitogen.parent.CallChain(context, pipelined=True)
chain.call_no_reply(os.mkdir, '/tmp/foo')
# If previous mkdir() failed, this never runs:
chain.call_no_reply(os.mkdir, '/tmp/foo/bar')
# If either mkdir() failed, this never runs, and returns the exception.
recv = chain.call_async(subprocess.check_output, '/tmp/foo')
# If mkdir() or check_call() failed, this never runs, and returns the
# exception.
chain.call(do_something)
# The receiver also got a copy of the exception, so if this
# code was executed, the exception would also be raised.
if recv.get().unpickle() == 'baz':
pass
When pipelining is enabled, :meth:`reset` must be called to ensure the last
exception is discarded, otherwise unbounded memory usage is possible in
long-running programs. :class:`CallChain` supports the context manager
protocol to ensure :meth:`reset` is always invoked::
with mitogen.parent.CallChain(context, pipelined=True) as chain:
chain.call_no_reply(...)
chain.call_no_reply(...)
chain.call_no_reply(...)
chain.call(...)
# chain.reset() automatically invoked.
"""
def __init__(self, context, pipelined=False):
self.context = context
if pipelined:
self.chain_id = self.make_chain_id()
else:
self.chain_id = None
@classmethod
def make_chain_id(cls):
return '%s-%s-%s-%s' % (
socket.gethostname(),
os.getpid(),
threading.currentThread().ident,
int(1e6 * time.time()),
)
def __enter__(self):
return self
def __exit__(self, _1, _2, _3):
self.reset()
def reset(self):
"""
Instruct the target to forget any related exception.
"""
if not self.chain_id:
return
saved, self.chain_id = self.chain_id, None
try:
self.call_no_reply(mitogen.core.Dispatcher.forget_chain, saved)
finally:
self.chain_id = saved
def make_msg(self, fn, *args, **kwargs):
if inspect.ismethod(fn) and inspect.isclass(fn.__self__):
klass = mitogen.core.to_text(fn.__self__.__name__)
else:
klass = None
tup = (
self.chain_id,
mitogen.core.to_text(fn.__module__),
klass,
mitogen.core.to_text(fn.__name__),
args,
mitogen.core.Kwargs(kwargs)
)
return mitogen.core.Message.pickled(tup,
handle=mitogen.core.CALL_FUNCTION)
def call_no_reply(self, fn, *args, **kwargs):
"""
Like :meth:`call_async`, but do not wait for a return value, and inform
the target context no such reply is expected. If the call fails and
pipelining is disabled, the full exception will be logged to the target
context's logging framework.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
"""
LOG.debug('%r.call_no_reply(%r, *%r, **%r)',
self, fn, args, kwargs)
self.context.send(self.make_msg(fn, *args, **kwargs))
def call_async(self, fn, *args, **kwargs):
"""
Arrange for the context's ``CALL_FUNCTION`` handle to receive a message
that causes `fn(\*args, \**kwargs)` to be invoked on the context's main
thread.
:param fn:
A free function in module scope or a class method of a class
directly reachable from module scope:
.. code-block:: python
# mymodule.py
def my_func():
'''A free function reachable as mymodule.my_func'''
class MyClass:
@classmethod
def my_classmethod(cls):
'''Reachable as mymodule.MyClass.my_classmethod'''
def my_instancemethod(self):
'''Unreachable: requires a class instance!'''
class MyEmbeddedClass:
@classmethod
def my_classmethod(cls):
'''Not directly reachable from module scope!'''
:param tuple args:
Function arguments, if any. See :ref:`serialization-rules` for
permitted types.
:param dict kwargs:
Function keyword arguments, if any. See :ref:`serialization-rules`
for permitted types.
:returns:
:class:`mitogen.core.Receiver` configured to receive the result of
the invocation:
.. code-block:: python
recv = context.call_async(os.check_output, 'ls /tmp/')
try:
# Prints output once it is received.
msg = recv.get()
print(msg.unpickle())
except mitogen.core.CallError, e:
print('Call failed:', str(e))
Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using
:class:`mitogen.select.Select`.
"""
LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs))
return self.context.send_async(self.make_msg(fn, *args, **kwargs))
def call(self, fn, *args, **kwargs):
"""
Equivalent to :meth:`call_async(fn, \*args, \**kwargs).get().unpickle()
<call_async>`.
:returns:
The function's return value.
:raises mitogen.core.CallError:
An exception was raised in the remote context during execution.
"""
receiver = self.call_async(fn, *args, **kwargs)
return receiver.get().unpickle(throw_dead=False)
class Context(mitogen.core.Context): class Context(mitogen.core.Context):
call_chain_class = CallChain
via = None via = None
def __init__(self, *args, **kwargs):
super(Context, self).__init__(*args, **kwargs)
self.default_call_chain = self.call_chain_class(self)
def __eq__(self, other): def __eq__(self, other):
return (isinstance(other, mitogen.core.Context) and return (isinstance(other, mitogen.core.Context) and
(other.context_id == self.context_id) and (other.context_id == self.context_id) and
@ -1149,20 +1340,13 @@ class Context(mitogen.core.Context):
return hash((self.router, self.context_id)) return hash((self.router, self.context_id))
def call_async(self, fn, *args, **kwargs): def call_async(self, fn, *args, **kwargs):
LOG.debug('%r.call_async(): %r', self, CallSpec(fn, args, kwargs)) return self.default_call_chain.call_async(fn, *args, **kwargs)
return self.send_async(make_call_msg(fn, *args, **kwargs))
def call(self, fn, *args, **kwargs): def call(self, fn, *args, **kwargs):
receiver = self.call_async(fn, *args, **kwargs) return self.default_call_chain.call(fn, *args, **kwargs)
return receiver.get().unpickle(throw_dead=False)
def call_no_reply(self, fn, *args, **kwargs): def call_no_reply(self, fn, *args, **kwargs):
LOG.debug('%r.call_no_reply(%r, *%r, **%r)', self.default_call_chain.call_no_reply(fn, *args, **kwargs)
self, fn, args, kwargs)
self.send(make_call_msg(fn, *args, **kwargs))
def forget_chain(self, chain_id):
self.call_no_reply(mitogen.core.Dispatcher.forget_chain, chain_id)
def shutdown(self, wait=False): def shutdown(self, wait=False):
LOG.debug('%r.shutdown() sending SHUTDOWN', self) LOG.debug('%r.shutdown() sending SHUTDOWN', self)

@ -4,6 +4,7 @@ import time
import unittest2 import unittest2
import mitogen.core import mitogen.core
import mitogen.parent
import mitogen.master import mitogen.master
import testlib import testlib
@ -120,36 +121,37 @@ class CallFunctionTest(testlib.RouterMixin, testlib.TestCase):
class ChainTest(testlib.RouterMixin, testlib.TestCase): class ChainTest(testlib.RouterMixin, testlib.TestCase):
# Verify mitogen_chain functionality. # Verify mitogen_chain functionality.
klass = mitogen.parent.CallChain
def setUp(self): def setUp(self):
super(ChainTest, self).setUp() super(ChainTest, self).setUp()
self.local = self.router.fork() self.local = self.router.fork()
def test_subsequent_calls_produce_same_error(self): def test_subsequent_calls_produce_same_error(self):
self.assertEquals('xx', chain = self.klass(self.local, pipelined=True)
self.local.call(func_returns_arg, 'xx', mitogen_chain='c1')) self.assertEquals('xx', chain.call(func_returns_arg, 'xx'))
self.local.call_no_reply(function_that_fails, 'x1', mitogen_chain='c1') chain.call_no_reply(function_that_fails, 'x1')
e1 = self.assertRaises(mitogen.core.CallError, e1 = self.assertRaises(mitogen.core.CallError,
lambda: self.local.call(function_that_fails, 'x2', mitogen_chain='c1')) lambda: chain.call(function_that_fails, 'x2'))
e2 = self.assertRaises(mitogen.core.CallError, e2 = self.assertRaises(mitogen.core.CallError,
lambda: self.local.call(func_returns_arg, 'x3', mitogen_chain='c1')) lambda: chain.call(func_returns_arg, 'x3'))
self.assertEquals(str(e1), str(e2)) self.assertEquals(str(e1), str(e2))
def test_unrelated_overlapping_failed_chains(self): def test_unrelated_overlapping_failed_chains(self):
self.local.call_no_reply(function_that_fails, 'c1', mitogen_chain='c1') c1 = self.klass(self.local, pipelined=True)
self.assertEquals('yes', c2 = self.klass(self.local, pipelined=True)
self.local.call(func_returns_arg, 'yes', mitogen_chain='c2')) c1.call_no_reply(function_that_fails, 'c1')
self.assertEquals('yes', c2.call(func_returns_arg, 'yes'))
self.assertRaises(mitogen.core.CallError, self.assertRaises(mitogen.core.CallError,
lambda: self.local.call(func_returns_arg, 'yes', mitogen_chain='c1')) lambda: c1.call(func_returns_arg, 'yes'))
self.local.call_no_reply(function_that_fails, 'c2', mitogen_chain='c2')
def test_forget(self): def test_reset(self):
self.local.call_no_reply(function_that_fails, 'x1', mitogen_chain='c1') c1 = self.klass(self.local, pipelined=True)
c1.call_no_reply(function_that_fails, 'x1')
e1 = self.assertRaises(mitogen.core.CallError, e1 = self.assertRaises(mitogen.core.CallError,
lambda: self.local.call(function_that_fails, 'x2', mitogen_chain='c1')) lambda: c1.call(function_that_fails, 'x2'))
self.local.forget_chain('c1') c1.reset()
self.assertEquals('x3', self.assertEquals('x3', c1.call(func_returns_arg, 'x3'))
self.local.call(func_returns_arg, 'x3', mitogen_chain='c1'))
if __name__ == '__main__': if __name__ == '__main__':

Loading…
Cancel
Save