|
|
@ -1123,36 +1123,36 @@ class ChildIdAllocator(object):
|
|
|
|
|
|
|
|
|
|
|
|
class CallChain(object):
|
|
|
|
class CallChain(object):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Construct :data:`mitogen.core.CALL_FUNCTION` messages and deliver them to a
|
|
|
|
Deliver :data:`mitogen.core.CALL_FUNCTION` messages to a target context,
|
|
|
|
target context, optionally threading related calls such that an exception
|
|
|
|
optionally threading related calls so an exception in an earlier call
|
|
|
|
in an earlier call cancels subsequent calls.
|
|
|
|
cancels subsequent calls.
|
|
|
|
|
|
|
|
|
|
|
|
:param mitogen.core.Context context:
|
|
|
|
:param mitogen.core.Context context:
|
|
|
|
Target context.
|
|
|
|
Target context.
|
|
|
|
:param bool pipelined:
|
|
|
|
:param bool pipelined:
|
|
|
|
Enable pipelined mode.
|
|
|
|
Enable pipelining.
|
|
|
|
|
|
|
|
|
|
|
|
By default, :meth:`call`, :meth:`call_no_reply` and :meth:`call_async`
|
|
|
|
:meth:`call`, :meth:`call_no_reply` and :meth:`call_async`
|
|
|
|
issue calls and produce responses, with no memory of prior exceptions. If a
|
|
|
|
normally issue calls and produce responses with no memory of prior
|
|
|
|
call made with :meth:`call_no_reply` fails, the traceback is logged to the
|
|
|
|
exceptions. If a call made with :meth:`call_no_reply` fails, the exception
|
|
|
|
target context's logging framework.
|
|
|
|
is logged to the target context's logging framework.
|
|
|
|
|
|
|
|
|
|
|
|
**Pipelined Mode**
|
|
|
|
**Pipelining**
|
|
|
|
|
|
|
|
|
|
|
|
When `pipelined=True`, if an exception occurs in a call,
|
|
|
|
When pipelining is enabled, if an exception occurs during a call,
|
|
|
|
all subsequent calls made by the same :class:`CallChain` instance will fail
|
|
|
|
subsequent calls made by the same :class:`CallChain` fail with the same
|
|
|
|
with the same exception, including those already in-flight on the network,
|
|
|
|
exception, including those already in-flight on the network, and no further
|
|
|
|
and no further calls will execute until :meth:`reset` is invoked.
|
|
|
|
calls execute until :meth:`reset` is invoked.
|
|
|
|
|
|
|
|
|
|
|
|
No traceback is logged for calls made with :meth:`call_no_reply`, instead
|
|
|
|
No exception is logged for calls made with :meth:`call_no_reply`, instead
|
|
|
|
the exception is saved and reported as the result of subsequent
|
|
|
|
it is saved and reported as the result of subsequent :meth:`call` or
|
|
|
|
:meth:`call` or :meth:`call_async` calls.
|
|
|
|
:meth:`call_async` calls.
|
|
|
|
|
|
|
|
|
|
|
|
A sequence of pipelined asynchronous calls can be made without wasting
|
|
|
|
Sequences of asynchronous calls can be made without wasting network
|
|
|
|
network round-trips to discover if prior calls succeeded, while allowing
|
|
|
|
round-trips to discover if prior calls succeed, and chains originating from
|
|
|
|
such chains to overlap concurrently at a target context from multiple
|
|
|
|
multiple unrelated source contexts may overlap concurrently at a target
|
|
|
|
unrelated source contexts. This enables many calls to safely progress in
|
|
|
|
context without interference. In this example, 4 calls complete in one
|
|
|
|
one network round-trip, like::
|
|
|
|
round-trip::
|
|
|
|
|
|
|
|
|
|
|
|
chain = mitogen.parent.CallChain(context, pipelined=True)
|
|
|
|
chain = mitogen.parent.CallChain(context, pipelined=True)
|
|
|
|
chain.call_no_reply(os.mkdir, '/tmp/foo')
|
|
|
|
chain.call_no_reply(os.mkdir, '/tmp/foo')
|
|
|
@ -1160,22 +1160,21 @@ class CallChain(object):
|
|
|
|
# If previous mkdir() failed, this never runs:
|
|
|
|
# If previous mkdir() failed, this never runs:
|
|
|
|
chain.call_no_reply(os.mkdir, '/tmp/foo/bar')
|
|
|
|
chain.call_no_reply(os.mkdir, '/tmp/foo/bar')
|
|
|
|
|
|
|
|
|
|
|
|
# If either mkdir() failed, this never runs, and returns the exception.
|
|
|
|
# If either mkdir() failed, this never runs, and the exception is
|
|
|
|
|
|
|
|
# asynchronously delivered to the receiver.
|
|
|
|
recv = chain.call_async(subprocess.check_output, '/tmp/foo')
|
|
|
|
recv = chain.call_async(subprocess.check_output, '/tmp/foo')
|
|
|
|
|
|
|
|
|
|
|
|
# If mkdir() or check_call() failed, this never runs, and returns the
|
|
|
|
# If anything so far failed, this never runs, and raises the exception.
|
|
|
|
# exception.
|
|
|
|
|
|
|
|
chain.call(do_something)
|
|
|
|
chain.call(do_something)
|
|
|
|
|
|
|
|
|
|
|
|
# The receiver also got a copy of the exception, so if this
|
|
|
|
# If this code was executed, the exception would also be raised.
|
|
|
|
# code was executed, the exception would also be raised.
|
|
|
|
|
|
|
|
if recv.get().unpickle() == 'baz':
|
|
|
|
if recv.get().unpickle() == 'baz':
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
When pipelining is enabled, :meth:`reset` must be called to ensure the last
|
|
|
|
When pipelining is enabled, :meth:`reset` must be invoked to ensure any
|
|
|
|
exception is discarded, otherwise unbounded memory usage is possible in
|
|
|
|
exception is discarded, otherwise unbounded memory usage is possible in
|
|
|
|
long-running programs. :class:`CallChain` supports the context manager
|
|
|
|
long-running programs. The context manager protocol is supported to ensure
|
|
|
|
protocol to ensure :meth:`reset` is always invoked::
|
|
|
|
:meth:`reset` is always invoked::
|
|
|
|
|
|
|
|
|
|
|
|
with mitogen.parent.CallChain(context, pipelined=True) as chain:
|
|
|
|
with mitogen.parent.CallChain(context, pipelined=True) as chain:
|
|
|
|
chain.call_no_reply(...)
|
|
|
|
chain.call_no_reply(...)
|
|
|
@ -1243,20 +1242,16 @@ class CallChain(object):
|
|
|
|
def call_no_reply(self, fn, *args, **kwargs):
|
|
|
|
def call_no_reply(self, fn, *args, **kwargs):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Like :meth:`call_async`, but do not wait for a return value, and inform
|
|
|
|
Like :meth:`call_async`, but do not wait for a return value, and inform
|
|
|
|
the target context no such reply is expected. If the call fails and
|
|
|
|
the target context no reply is expected. If the call fails and
|
|
|
|
pipelining is disabled, the full exception will be logged to the target
|
|
|
|
pipelining is disabled, the exception will be logged to the target
|
|
|
|
context's logging framework.
|
|
|
|
context's logging framework.
|
|
|
|
|
|
|
|
|
|
|
|
:raises mitogen.core.CallError:
|
|
|
|
|
|
|
|
An exception was raised in the remote context during execution.
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs))
|
|
|
|
LOG.debug('%r.call_no_reply(): %r', self, CallSpec(fn, args, kwargs))
|
|
|
|
self.context.send(self.make_msg(fn, *args, **kwargs))
|
|
|
|
self.context.send(self.make_msg(fn, *args, **kwargs))
|
|
|
|
|
|
|
|
|
|
|
|
def call_async(self, fn, *args, **kwargs):
|
|
|
|
def call_async(self, fn, *args, **kwargs):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Arrange for the context's ``CALL_FUNCTION`` handle to receive a message
|
|
|
|
Arrange for `fn(\*args, \**kwargs)` to be invoked on the context's main
|
|
|
|
that causes `fn(\*args, \**kwargs)` to be invoked on the context's main
|
|
|
|
|
|
|
|
thread.
|
|
|
|
thread.
|
|
|
|
|
|
|
|
|
|
|
|
:param fn:
|
|
|
|
:param fn:
|
|
|
@ -1312,12 +1307,13 @@ class CallChain(object):
|
|
|
|
|
|
|
|
|
|
|
|
def call(self, fn, *args, **kwargs):
|
|
|
|
def call(self, fn, *args, **kwargs):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Equivalent to :meth:`call_async(fn, \*args, \**kwargs).get().unpickle()
|
|
|
|
Like :meth:`call_async`, but block until the return value is available.
|
|
|
|
<call_async>`.
|
|
|
|
Equivalent to::
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
call_async(fn, *args, **kwargs).get().unpickle()
|
|
|
|
|
|
|
|
|
|
|
|
:returns:
|
|
|
|
:returns:
|
|
|
|
The function's return value.
|
|
|
|
The function's return value.
|
|
|
|
|
|
|
|
|
|
|
|
:raises mitogen.core.CallError:
|
|
|
|
:raises mitogen.core.CallError:
|
|
|
|
An exception was raised in the remote context during execution.
|
|
|
|
An exception was raised in the remote context during execution.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|