issue #388: move a ton of documentation back into the source

issue260
David Wilson 6 years ago
parent fd326f5ad7
commit a7ee23719a

@ -84,237 +84,16 @@ Message Class
=============
.. currentmodule:: mitogen.core
.. class:: Message
Messages are the fundamental unit of communication, comprising fields from
the :ref:`stream-protocol` header, an optional reference to the receiving
:class:`mitogen.core.Router` for ingress messages, and helper methods for
deserialization and generating replies.
.. attribute:: router
The :class:`mitogen.core.Router` responsible for routing the
message. This is :data:`None` for locally originated messages.
.. attribute:: receiver
The :class:`mitogen.core.Receiver` over which the message was last
received. Part of the :class:`mitogen.select.Select` interface.
Defaults to :data:`None`.
.. attribute:: dst_id
Integer target context ID. :class:`mitogen.core.Router` delivers
messages locally when their :attr:`dst_id` matches
:data:`mitogen.context_id`, otherwise they are routed up or downstream.
.. attribute:: src_id
Integer source context ID. Used as the target of replies if any are
generated.
.. attribute:: auth_id
The context ID under whose authority the message is acting. See
:ref:`source-verification`.
.. attribute:: handle
Integer target handle in the destination context. This is one of the
:ref:`standard-handles`, or a dynamically generated handle used to
receive a one-time reply, such as the return value of a function call.
.. attribute:: reply_to
Integer target handle to direct any reply to this message. Used to
receive a one-time reply, such as the return value of a function call.
:data:`IS_DEAD` has a special meaning when it appears in this field.
.. attribute:: data
Message data, which may be raw or pickled.
.. attribute:: is_dead
:data:`True` if :attr:`reply_to` is set to the magic value
:data:`mitogen.core.IS_DEAD`, indicating the sender considers the
channel dead.
.. py:method:: __init__ (\**kwargs)
Construct a message from from the supplied `kwargs`. :attr:`src_id`
and :attr:`auth_id` are always set to :data:`mitogen.context_id`.
.. py:classmethod:: pickled (obj, \**kwargs)
Construct a pickled message, setting :attr:`data` to the
serialization of `obj`, and setting remaining fields using `kwargs`.
:returns:
The new message.
.. method:: unpickle (throw=True)
Unpickle :attr:`data`, optionally raising any exceptions present.
:param bool throw:
If :data:`True`, raise exceptions, otherwise it is the caller's
responsibility.
:raises mitogen.core.CallError:
The serialized data contained CallError exception.
:raises mitogen.core.ChannelError:
The `is_dead` field was set.
.. method:: reply (obj, router=None, \**kwargs)
Compose a reply to this message and send it using :attr:`router`, or
`router` is :attr:`router` is :data:`None`.
:param obj:
Either a :class:`Message`, or an object to be serialized in order
to construct a new message.
:param router:
Optional router to use if :attr:`router` is :data:`None`.
:param kwargs:
Optional keyword parameters overriding message fields in the reply.
.. autoclass:: Message
:members:
Router Class
============
.. currentmodule:: mitogen.core
.. class:: Router
Route messages between parent and child contexts, and invoke handlers
defined on our parent context. :meth:`Router.route() <route>` straddles
the :class:`Broker <mitogen.core.Broker>` and user threads, it is safe
to call anywhere.
**Note:** This is the somewhat limited core version of the Router class
used by child contexts. The master subclass is documented below this one.
.. attribute:: unidirectional
When :data:`True`, permit children to only communicate with the current
context or a parent of the current context. Routing between siblings or
children of parents is prohibited, ensuring no communication is
possible between intentionally partitioned networks, such as when a
program simultaneously manipulates hosts spread across a corporate and
a production network, or production networks that are otherwise
air-gapped.
Sending a prohibited message causes an error to be logged and a dead
message to be sent in reply to the errant message, if that message has
``reply_to`` set.
The value of :data:`unidirectional` becomes the default for the
:meth:`local() <mitogen.master.Router.local>` `unidirectional`
parameter.
.. method:: stream_by_id (dst_id)
Return the :class:`mitogen.core.Stream` that should be used to
communicate with `dst_id`. If a specific route for `dst_id` is not
known, a reference to the parent context's stream is returned.
.. method:: add_route (target_id, via_id)
Arrange for messages whose `dst_id` is `target_id` to be forwarded on
the directly connected stream for `via_id`. This method is called
automatically in response to ``ADD_ROUTE`` messages, but remains public
for now while the design has not yet settled, and situations may arise
where routing is not fully automatic.
.. method:: register (context, stream)
Register a new context and its associated stream, and add the stream's
receive side to the I/O multiplexer. This This method remains public
for now while hte design has not yet settled.
.. method:: add_handler (fn, handle=None, persist=True, respondent=None, policy=None)
Invoke `fn(msg)` for each Message sent to `handle` from this context.
Unregister after one invocation if `persist` is :data:`False`. If
`handle` is :data:`None`, a new handle is allocated and returned.
:param int handle:
If not :data:`None`, an explicit handle to register, usually one of
the ``mitogen.core.*`` constants. If unspecified, a new unused
handle will be allocated.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param mitogen.core.Context respondent:
Context that messages to this handle are expected to be sent from.
If specified, arranges for a dead message to be delivered to `fn`
when disconnection of the context is detected.
In future `respondent` will likely also be used to prevent other
contexts from sending messages to the handle.
:param function policy:
Function invoked as `policy(msg, stream)` where `msg` is a
:class:`mitogen.core.Message` about to be delivered, and
`stream` is the :class:`mitogen.core.Stream` on which it was
received. The function must return :data:`True`, otherwise an
error is logged and delivery is refused.
Two built-in policy functions exist:
* :func:`mitogen.core.has_parent_authority`: requires the
message arrived from a parent context, or a context acting with a
parent context's authority (``auth_id``).
* :func:`mitogen.parent.is_immediate_child`: requires the
message arrived from an immediately connected child, for use in
messaging patterns where either something becomes buggy or
insecure by permitting indirect upstream communication.
In case of refusal, and the message's ``reply_to`` field is
nonzero, a :class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:return:
`handle`, or if `handle` was :data:`None`, the newly allocated
handle.
.. method:: del_handler (handle)
Remove the handle registered for `handle`
:raises KeyError:
The handle wasn't registered.
.. method:: _async_route(msg, stream=None)
Arrange for `msg` to be forwarded towards its destination. If its
destination is the local context, then arrange for it to be dispatched
using the local handlers.
This is a lower overhead version of :meth:`route` that may only be
called from the I/O multiplexer thread.
:param mitogen.core.Stream stream:
If not :data:`None`, a reference to the stream the message arrived
on. Used for performing source route verification, to ensure
sensitive messages such as ``CALL_FUNCTION`` arrive only from
trusted contexts.
.. method:: route(msg)
Arrange for the :class:`Message` `msg` to be delivered to its
destination using any relevant downstream context, or if none is found,
by forwarding the message upstream towards the master context. If `msg`
is destined for the local context, it is dispatched using the handles
registered with :meth:`add_handler`.
This may be called from any thread.
.. autoclass:: Router
:members:
.. currentmodule:: mitogen.master
@ -362,11 +141,6 @@ Router Class
``ALLOCATE_ID`` message that causes the master to emit matching
``ADD_ROUTE`` messages prior to replying.
.. method:: context_by_id (context_id, via_id=None)
Messy factory/lookup function to find a context by its ID, or construct
it. In future this will be replaced by a much more sensible interface.
.. _context-factories:
**Context Factories**
@ -803,53 +577,8 @@ Context Class
=============
.. currentmodule:: mitogen.core
.. class:: Context
Represent a remote context regardless of connection method.
**Note:** This is the somewhat limited core version of the Context class
used by child contexts. The master subclass is documented below this one.
.. method:: send (msg)
Arrange for `msg` to be delivered to this context.
:attr:`dst_id <Message.dst_id>` is set to the target context ID.
:param mitogen.core.Message msg:
The message.
.. method:: send_async (msg, persist=False)
Arrange for `msg` to be delivered to this context, with replies
directed to a newly constructed receiver. :attr:`dst_id
<Message.dst_id>` is set to the target context ID, and :attr:`reply_to
<Message.reply_to>` is set to the newly constructed receiver's handle.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param mitogen.core.Message msg:
The message.
:returns:
:class:`mitogen.core.Receiver` configured to receive any replies
sent to the message's `reply_to` handle.
.. method:: send_await (msg, deadline=None)
Like :meth:`send_async`, but expect a single reply (`persist=False`)
delivered within `deadline` seconds.
:param mitogen.core.Message msg:
The message.
:param float deadline:
If not :data:`None`, seconds before timing out waiting for a reply.
:returns:
The deserialized reply.
:raises mitogen.core.TimeoutError:
No message was received and `deadline` passed.
.. autoclass:: Context
:members:
.. currentmodule:: mitogen.parent
@ -857,340 +586,33 @@ Context Class
.. autoclass:: CallChain
:members:
.. class:: Context
Extend :class:`mitogen.core.Context` with functionality useful to masters,
and child contexts who later become parents. Currently when this class is
required, the target context's router is upgraded at runtime.
.. attribute:: default_call_chain
A :class:`CallChain` instance constructed by default, with pipelining
disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply`
use this instance.
.. method:: shutdown (wait=False)
Arrange for the context to receive a ``SHUTDOWN`` message, triggering
graceful shutdown.
Due to a lack of support for timers, no attempt is made yet to force
terminate a hung context using this method. This will be fixed shortly.
:param bool wait:
If :data:`True`, block the calling thread until the context has
completely terminated.
:returns:
If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch`
whose :meth:`get() <mitogen.core.Latch.get>` method returns
:data:`None` when shutdown completes. The `timeout` parameter may
be used to implement graceful timeouts.
.. method:: call_async (fn, \*args, \*\*kwargs)
See :meth:`CallChain.call_async`.
.. method:: call (fn, \*args, \*\*kwargs)
See :meth:`CallChain.call`.
.. method:: call_no_reply (fn, \*args, \*\*kwargs)
See :meth:`CallChain.call_no_reply`.
.. autoclass:: Context
:members:
Receiver Class
==============
.. currentmodule:: mitogen.core
.. class:: Receiver (router, handle=None, persist=True, respondent=None)
Receivers are used to wait for pickled responses from another context to be
sent to a handle registered in this context. A receiver may be single-use
(as in the case of :meth:`mitogen.parent.Context.call_async`) or
multiple use.
:param mitogen.core.Router router:
Router to register the handler on.
:param int handle:
If not :data:`None`, an explicit handle to register, otherwise an
unused handle is chosen.
:param bool persist:
If :data:`True`, do not unregister the receiver's handler after the
first message.
:param mitogen.core.Context respondent:
Reference to the context this receiver is receiving from. If not
:data:`None`, arranges for the receiver to receive a dead message if
messages can no longer be routed to the context, due to disconnection
or exit.
.. attribute:: notify = None
If not :data:`None`, a reference to a function invoked as
`notify(receiver)` when a new message is delivered to this receiver.
Used by :class:`mitogen.select.Select` to implement waiting on
multiple receivers.
.. py:method:: to_sender ()
Return a :class:`mitogen.core.Sender` configured to deliver messages
to this receiver. Since a Sender can be serialized, this makes it
convenient to pass `(context_id, handle)` pairs around::
def deliver_monthly_report(sender):
for line in open('monthly_report.txt'):
sender.send(line)
sender.close()
remote = router.ssh(hostname='mainframe')
recv = mitogen.core.Receiver(router)
remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv:
print(msg)
.. py:method:: empty ()
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :meth:`empty` and
:meth:`get`.
:meth:`empty` is only useful to avoid a race while installing
:attr:`notify`:
.. code-block:: python
recv.notify = _my_notify_function
if not recv.empty():
_my_notify_function(recv)
# It is guaranteed the receiver was empty after the notification
# function was installed, or that it was non-empty and the
# notification function was invoked at least once.
.. py:method:: close ()
Cause :class:`mitogen.core.ChannelError` to be raised in any thread
waiting in :meth:`get` on this receiver.
.. py:method:: get (timeout=None)
Sleep waiting for a message to arrive on this receiver.
:param float timeout:
If not :data:`None`, specifies a timeout in seconds.
:raises mitogen.core.ChannelError:
The remote end indicated the channel should be closed, or
communication with its parent context was lost.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:returns:
`(msg, data)` tuple, where `msg` is the
:class:`mitogen.core.Message` that was received, and `data` is
its unpickled data part.
.. py:method:: get_data (timeout=None)
Like :meth:`get`, except only return the data part.
.. py:method:: __iter__ ()
Block and yield `(msg, data)` pairs delivered to this receiver until
:class:`mitogen.core.ChannelError` is raised.
.. autoclass:: Receiver
:members:
Sender Class
============
.. currentmodule:: mitogen.core
.. class:: Sender (context, dst_handle)
Senders are used to send pickled messages to a handle in another context,
it is the inverse of :class:`mitogen.core.Sender`.
Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information.
:param mitogen.core.Context context:
Context to send messages to.
:param int dst_handle:
Destination handle to send messages to.
.. py:method:: close ()
Send a dead message to the remote end, causing :meth:`ChannelError`
to be raised in any waiting thread.
.. py:method:: send (data)
Send `data` to the remote end.
.. autoclass:: Sender
:members:
Select Class
============
.. module:: mitogen.select
.. currentmodule:: mitogen.select
.. class:: Select (receivers=(), oneshot=True)
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print('Got %s from %s' % (msg, msg.receiver))
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print('Received total %s from %s receivers' % (total, len(recvs)))
:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.select.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work()),
mitogen.select.Select([
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work())
])
]
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
.. py:classmethod:: all (it)
Take an iterable of receivers and retrieve a :class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch
:meth:`Context.call_async <mitogen.parent.Context.call_async>`
invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.select.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
recvs = [c.call_async(get_disk_usage) for c in contexts]
sum(recv.get().unpickle() for recv in recvs)
Result processing happens in the order results arrive, rather than the
order requests were issued, so :meth:`all` should always be faster.
.. py:method:: get (timeout=None, block=True)
Fetch the next available value from any receiver, or raise
:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :data:`False`, immediately raise
:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:meth:`close` has been called, and the underlying latch is no
longer valid.
.. py:method:: __bool__ ()
Return :data:`True` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :meth:`get` to be woken with
:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
.. py:method:: empty ()
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :meth:`empty` and
:meth:`get`.
:meth:`empty` may return :data:`False` even when :meth:`get`
would block if another thread has drained a receiver added to this
select. This can be avoided by only consuming each receiver from a
single thread.
.. py:method:: __iter__ (self)
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :meth:`remove`.
.. py:method:: add (recv)
Add the :class:`mitogen.core.Receiver` or
:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :class:`mitogen.core.Receiver` or
:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :meth:`remove`, then it will
still be returned by a subsequent :meth:`get`. This may change in a
future version.
.. autoclass:: Select
:members:
Channel Class
@ -1327,64 +749,13 @@ Utility Functions
A random assortment of utility functions useful on masters and children.
.. currentmodule:: mitogen.utils
.. function:: cast (obj)
Many tools love to subclass built-in types in order to implement useful
functionality, such as annotating the safety of a Unicode string, or adding
additional methods to a dict. However, cPickle loves to preserve those
subtypes during serialization, resulting in CallError during :meth:`call
<mitogen.parent.Context.call>` in the target when it tries to deserialize
the data.
This function walks the object graph `obj`, producing a copy with any
custom sub-types removed. The functionality is not default since the
resulting walk may be computationally expensive given a large enough graph.
See :ref:`serialization-rules` for a list of supported types.
:param obj:
Object to undecorate.
:returns:
Undecorated object.
.. currentmodule:: mitogen.utils
.. function:: disable_site_packages
.. autofunction:: cast
Remove all entries mentioning ``site-packages`` or ``Extras`` from the
system path. Used primarily for testing on OS X within a virtualenv, where
OS X bundles some ancient version of the :mod:`six` module.
.. currentmodule:: mitogen.utils
.. function:: log_to_file (path=None, io=False, level='INFO')
Install a new :class:`logging.Handler` writing applications logs to the
filesystem. Useful when debugging slave IO problems.
Parameters to this function may be overridden at runtime using environment
variables. See :ref:`logging-env-vars`.
:param str path:
If not :data:`None`, a filesystem path to write logs to. Otherwise,
logs are written to :data:`sys.stderr`.
:param bool io:
If :data:`True`, include extremely verbose IO logs in the output.
Useful for debugging hangs, less useful for debugging application code.
:param str level:
Name of the :mod:`logging` package constant that is the minimum
level to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``,
and ``ERROR``.
.. currentmodule:: mitogen.utils
.. function:: run_with_router(func, \*args, \**kwargs)
Arrange for `func(router, \*args, \**kwargs)` to run with a temporary
:class:`mitogen.master.Router`, ensuring the Router and Broker are
correctly shut down during normal or exceptional return.
:returns:
`func`'s return value.
.. autofunction:: disable_site_packages
.. autofunction:: log_to_file
.. autofunction:: run_with_router(func, \*args, \**kwargs)
.. currentmodule:: mitogen.utils
.. decorator:: with_router

@ -500,18 +500,53 @@ else:
class Message(object):
"""
Messages are the fundamental unit of communication, comprising fields from
the :ref:`stream-protocol` header, an optional reference to the receiving
:class:`mitogen.core.Router` for ingress messages, and helper methods for
deserialization and generating replies.
"""
#: Integer target context ID. :class:`Router` delivers messages locally
#: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise
#: they are routed up or downstream.
dst_id = None
#: Integer source context ID. Used as the target of replies if any are
#: generated.
src_id = None
#: Context ID under whose authority the message is acting. See
#: :ref:`source-verification`.
auth_id = None
#: Integer target handle in the destination context. This is one of the
#: :ref:`standard-handles`, or a dynamically generated handle used to
#: receive a one-time reply, such as the return value of a function call.
handle = None
#: Integer target handle to direct any reply to this message. Used to
#: receive a one-time reply, such as the return value of a function call.
#: :data:`IS_DEAD` has a special meaning when it appears in this field.
reply_to = None
#: Raw message data bytes.
data = b('')
_unpickled = object()
#: The :class:`Router` responsible for routing the message. This is
#: :data:`None` for locally originated messages.
router = None
#: The :class:`Receiver` over which the message was last received. Part of
#: the :class:`mitogen.select.Select` interface. Defaults to :data:`None`.
receiver = None
def __init__(self, **kwargs):
"""
Construct a message from from the supplied `kwargs`. :attr:`src_id` and
:attr:`auth_id` are always set to :data:`mitogen.context_id`.
"""
self.src_id = mitogen.context_id
self.auth_id = mitogen.context_id
vars(self).update(kwargs)
@ -551,14 +586,28 @@ class Message(object):
@property
def is_dead(self):
"""
:data:`True` if :attr:`reply_to` is set to the magic value
:data:`IS_DEAD`, indicating the sender considers the channel dead.
"""
return self.reply_to == IS_DEAD
@classmethod
def dead(cls, **kwargs):
"""
Syntax helper to construct a dead message.
"""
return cls(reply_to=IS_DEAD, **kwargs)
@classmethod
def pickled(cls, obj, **kwargs):
"""
Construct a pickled message, setting :attr:`data` to the serialization
of `obj`, and setting remaining fields using `kwargs`.
:returns:
The new message.
"""
self = cls(**kwargs)
try:
self.data = pickle.dumps(obj, protocol=2)
@ -568,6 +617,18 @@ class Message(object):
return self
def reply(self, msg, router=None, **kwargs):
"""
Compose a reply to this message and send it using :attr:`router`, or
`router` is :attr:`router` is :data:`None`.
:param obj:
Either a :class:`Message`, or an object to be serialized in order
to construct a new message.
:param router:
Optional router to use if :attr:`router` is :data:`None`.
:param kwargs:
Optional keyword parameters overriding message fields in the reply.
"""
if not isinstance(msg, Message):
msg = Message.pickled(msg)
msg.dst_id = self.src_id
@ -584,7 +645,18 @@ class Message(object):
UNPICKLER_KWARGS = {}
def unpickle(self, throw=True, throw_dead=True):
"""Deserialize `data` into an object."""
"""
Unpickle :attr:`data`, optionally raising any exceptions present.
:param bool throw_dead:
If :data:`True`, raise exceptions, otherwise it is the caller's
responsibility.
:raises CallError:
The serialized data contained CallError exception.
:raises ChannelError:
The `is_dead` field was set.
"""
_vv and IOLOG.debug('%r.unpickle()', self)
if throw_dead and self.is_dead:
raise ChannelError(ChannelError.remote_msg)
@ -616,25 +688,42 @@ class Message(object):
class Sender(object):
"""
Senders are used to send pickled messages to a handle in another context,
it is the inverse of :class:`mitogen.core.Sender`.
Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information.
:param Context context:
Context to send messages to.
:param int dst_handle:
Destination handle to send messages to.
"""
def __init__(self, context, dst_handle):
self.context = context
self.dst_handle = dst_handle
def __repr__(self):
return 'Sender(%r, %r)' % (self.context, self.dst_handle)
def __reduce__(self):
return _unpickle_sender, (self.context.context_id, self.dst_handle)
def send(self, data):
"""
Send `data` to the remote end.
"""
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle))
def close(self):
"""Indicate this channel is closed to the remote side."""
"""
Send a dead message to the remote, causing :meth:`ChannelError` to be
raised in any waiting thread.
"""
_vv and IOLOG.debug('%r.close()', self)
self.context.send(Message.dead(handle=self.dst_handle))
def send(self, data):
"""Send `data` to the remote."""
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle))
def __repr__(self):
return 'Sender(%r, %r)' % (self.context, self.dst_handle)
def __reduce__(self):
return _unpickle_sender, (self.context.context_id, self.dst_handle)
def _unpickle_sender(router, context_id, dst_handle):
@ -646,12 +735,39 @@ def _unpickle_sender(router, context_id, dst_handle):
class Receiver(object):
"""
Receivers maintain a thread-safe queue of messages sent to a handle of this
context from another context.
:param mitogen.core.Router router:
Router to register the handler on.
:param int handle:
If not :data:`None`, an explicit handle to register, otherwise an
unused handle is chosen.
:param bool persist:
If :data:`False`, unregister the handler after one message is received.
Single-message receivers are intended for RPC-like transactions, such
as in the case of :meth:`mitogen.parent.Context.call_async`.
:param mitogen.core.Context respondent:
Context this receiver is receiving from. If not :data:`None`, arranges
for the receiver to receive a dead message if messages can no longer be
routed to the context, due to disconnection or exit.
"""
#: If not :data:`None`, a reference to a function invoked as
#: `notify(receiver)` when a new message is delivered to this receiver.
#: Used by :class:`mitogen.select.Select` to implement waiting on multiple
#: receivers.
notify = None
raise_channelerror = True
def __init__(self, router, handle=None, persist=True,
respondent=None, policy=None):
self.router = router
#: The handle.
self.handle = handle # Avoid __repr__ crash in add_handler()
self._latch = Latch() # Must exist prior to .add_handler()
self.handle = router.add_handler(
@ -666,26 +782,73 @@ class Receiver(object):
return 'Receiver(%r, %r)' % (self.router, self.handle)
def to_sender(self):
"""
Return a :class:`Sender` configured to deliver messages to this
receiver. As senders are serializable, this makes it convenient to pass
`(context_id, handle)` pairs around::
def deliver_monthly_report(sender):
for line in open('monthly_report.txt'):
sender.send(line)
sender.close()
remote = router.ssh(hostname='mainframe')
recv = mitogen.core.Receiver(router)
remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv:
print(msg)
"""
context = Context(self.router, mitogen.context_id)
return Sender(context, self.handle)
def _on_receive(self, msg):
"""Callback from the Stream; appends data to the internal queue."""
"""
Callback from the Stream; appends data to the internal queue.
"""
_vv and IOLOG.debug('%r._on_receive(%r)', self, msg)
self._latch.put(msg)
if self.notify:
self.notify(self)
def close(self):
"""
Unregister the receiver's handle from its associated router, and cause
:class:`ChannelError` to be raised in any thread waiting in :meth:`get`
on this receiver.
"""
if self.handle:
self.router.del_handler(self.handle)
self.handle = None
self._latch.put(Message.dead())
def empty(self):
"""
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even though
a subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`empty` and :meth:`get`.
"""
return self._latch.empty()
def get(self, timeout=None, block=True, throw_dead=True):
"""
Sleep waiting for a message to arrive on this receiver.
:param float timeout:
If not :data:`None`, specifies a timeout in seconds.
:raises mitogen.core.ChannelError:
The remote end indicated the channel should be closed, or
communication with its parent context was lost.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:returns:
`(msg, data)` tuple, where `msg` is the :class:`Message` that was
received, and `data` is its unpickled data part.
"""
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
msg = self._latch.get(timeout=timeout, block=block)
if msg.is_dead and throw_dead:
@ -696,6 +859,10 @@ class Receiver(object):
return msg
def __iter__(self):
"""
Yield consecutive :class:`Message` instances delivered to this receiver
until :class:`ChannelError` is raised.
"""
while True:
msg = self.get(throw_dead=False)
if msg.is_dead:
@ -1213,6 +1380,28 @@ class Stream(BasicStream):
class Context(object):
"""
Represent a remote context regardless of the underlying connection method.
Context objects are simple facades that emit messages through an
associated router, and have :ref:`signals` raised against them in response
to various events relating to the context.
**Note:** This is the somewhat limited core version, used by child
contexts. The master subclass is documented below this one.
Contexts maintain no internal state and are thread-safe.
Prefer :meth:`Router.context_by_id` over constructing context objects
explicitly, as that method is deduplicating, and returns the only context
instance :ref:`signals` will be raised on.
:param Router router:
Router to emit messages through.
:param int context_id:
Context ID.
:param str name:
Context name.
"""
remote_name = None
def __init__(self, router, context_id, name=None):
@ -1231,6 +1420,23 @@ class Context(object):
fire(self, 'disconnect')
def send_async(self, msg, persist=False):
"""
Arrange for `msg` to be delivered to this context, with replies
directed to a newly constructed receiver. :attr:`dst_id
<Message.dst_id>` is set to the target context ID, and :attr:`reply_to
<Message.reply_to>` is set to the newly constructed receiver's handle.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param mitogen.core.Message msg:
The message.
:returns:
:class:`Receiver` configured to receive any replies sent to the
message's `reply_to` handle.
"""
if self.router.broker._thread == threading.currentThread(): # TODO
raise SystemError('Cannot making blocking call on broker thread')
@ -1254,8 +1460,13 @@ class Context(object):
return self.send_async(msg)
def send(self, msg):
"""send `obj` to `handle`, and tell the broker we have output. May
be called from any thread."""
"""
Arrange for `msg` to be delivered to this context. :attr:`dst_id
<Message.dst_id>` is set to the target context ID.
:param Message msg:
Message.
"""
msg.dst_id = self.context_id
self.router.route(msg)
@ -1264,7 +1475,19 @@ class Context(object):
return recv.get().unpickle()
def send_await(self, msg, deadline=None):
"""Send `msg` and wait for a response with an optional timeout."""
"""
Like :meth:`send_async`, but expect a single reply (`persist=False`)
delivered within `deadline` seconds.
:param mitogen.core.Message msg:
The message.
:param float deadline:
If not :data:`None`, seconds before timing out waiting for a reply.
:returns:
Deserialized reply.
:raises TimeoutError:
No message was received and `deadline` passed.
"""
receiver = self.send_async(msg)
response = receiver.get(deadline)
data = response.unpickle()
@ -1710,8 +1933,33 @@ class IoLogger(BasicStream):
class Router(object):
"""
Route messages between contexts, and invoke local handlers for messages
addressed to this context. :meth:`Router.route() <route>` straddles the
:class:`Broker <mitogen.core.Broker>` and user threads, it is safe to call
anywhere.
**Note:** This is the somewhat limited core version of the Router class
used by child contexts. The master subclass is documented below this one.
"""
context_class = Context
max_message_size = 128 * 1048576
#: When :data:`True`, permit children to only communicate with the current
#: context or a parent of the current context. Routing between siblings or
#: children of parents is prohibited, ensuring no communication is possible
#: between intentionally partitioned networks, such as when a program
#: simultaneously manipulates hosts spread across a corporate and a
#: production network, or production networks that are otherwise
#: air-gapped.
#:
#: Sending a prohibited message causes an error to be logged and a dead
#: message to be sent in reply to the errant message, if that message has
#: ``reply_to`` set.
#:
#: The value of :data:`unidirectional` becomes the default for the
#: :meth:`local() <mitogen.master.Router.local>` `unidirectional`
#: parameter.
unidirectional = False
def __init__(self, broker):
@ -1751,7 +1999,7 @@ class Router(object):
fire(self._context_by_id[target_id], 'disconnect')
def on_stream_disconnect(self, stream):
def _on_stream_disconnect(self, stream):
for context in self._context_by_id.values():
stream_ = self._stream_by_id.get(context.context_id)
if stream_ is stream:
@ -1764,6 +2012,10 @@ class Router(object):
func(Message.dead())
def context_by_id(self, context_id, via_id=None, create=True, name=None):
"""
Messy factory/lookup function to find a context by its ID, or construct
it. In future this will be replaced by a much more sensible interface.
"""
context = self._context_by_id.get(context_id)
if create and not context:
context = self.context_class(self, context_id, name=name)
@ -1773,21 +2025,85 @@ class Router(object):
return context
def register(self, context, stream):
"""
Register a newly constructed context and its associated stream, and add
the stream's receive side to the I/O multiplexer. This method remains
public while the design has not yet settled.
"""
_v and LOG.debug('register(%r, %r)', context, stream)
self._stream_by_id[context.context_id] = stream
self._context_by_id[context.context_id] = context
self.broker.start_receive(stream)
listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream))
listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream))
def stream_by_id(self, dst_id):
"""
Return the :class:`Stream` that should be used to communicate with
`dst_id`. If a specific route for `dst_id` is not known, a reference to
the parent context's stream is returned.
"""
return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id))
def del_handler(self, handle):
"""
Remove the handle registered for `handle`
:raises KeyError:
The handle wasn't registered.
"""
del self._handle_map[handle]
def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None):
"""
Invoke `fn(msg)` for each Message sent to `handle` from this context.
Unregister after one invocation if `persist` is :data:`False`. If
`handle` is :data:`None`, a new handle is allocated and returned.
:param int handle:
If not :data:`None`, an explicit handle to register, usually one of
the ``mitogen.core.*`` constants. If unspecified, a new unused
handle will be allocated.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param Context respondent:
Context that messages to this handle are expected to be sent from.
If specified, arranges for a dead message to be delivered to `fn`
when disconnection of the context is detected.
In future `respondent` will likely also be used to prevent other
contexts from sending messages to the handle.
:param function policy:
Function invoked as `policy(msg, stream)` where `msg` is a
:class:`mitogen.core.Message` about to be delivered, and `stream`
is the :class:`mitogen.core.Stream` on which it was received. The
function must return :data:`True`, otherwise an error is logged and
delivery is refused.
Two built-in policy functions exist:
* :func:`has_parent_authority`: requires the message arrived from a
parent context, or a context acting with a parent context's
authority (``auth_id``).
* :func:`mitogen.parent.is_immediate_child`: requires the
message arrived from an immediately connected child, for use in
messaging patterns where either something becomes buggy or
insecure by permitting indirect upstream communication.
In case of refusal, and the message's ``reply_to`` field is
nonzero, a :class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:return:
`handle`, or if `handle` was :data:`None`, the newly allocated
handle.
"""
handle = handle or next(self._last_handle)
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
@ -1848,6 +2164,19 @@ class Router(object):
msg.reply(Message.dead(), router=self)
def _async_route(self, msg, in_stream=None):
"""
Arrange for `msg` to be forwarded towards its destination. If its
destination is the local context, then arrange for it to be dispatched
using the local handlers.
This is a lower overhead version of :meth:`route` that may only be
called from the I/O multiplexer thread.
:param Stream in_stream:
If not :data:`None`, the stream the message arrived on. Used for
performing source route verification, to ensure sensitive messages
such as ``CALL_FUNCTION`` arrive only from trusted contexts.
"""
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
if len(msg.data) > self.max_message_size:
@ -1902,6 +2231,15 @@ class Router(object):
out_stream._send(msg)
def route(self, msg):
"""
Arrange for the :class:`Message` `msg` to be delivered to its
destination using any relevant downstream context, or if none is found,
by forwarding the message upstream towards the master context. If `msg`
is destined for the local context, it is dispatched using the handles
registered with :meth:`add_handler`.
This may be called from any thread.
"""
self.broker.defer(self._async_route, msg)

@ -1390,7 +1390,16 @@ class CallChain(object):
class Context(mitogen.core.Context):
"""
Extend :class:`mitogen.core.Context` with functionality useful to masters,
and child contexts who later become parents. Currently when this class is
required, the target context's router is upgraded at runtime.
"""
#: A :class:`CallChain` instance constructed by default, with pipelining
#: disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` use
#: this instance.
call_chain_class = CallChain
via = None
def __init__(self, *args, **kwargs):
@ -1406,15 +1415,41 @@ class Context(mitogen.core.Context):
return hash((self.router, self.context_id))
def call_async(self, fn, *args, **kwargs):
"""
See :meth:`CallChain.call_async`.
"""
return self.default_call_chain.call_async(fn, *args, **kwargs)
def call(self, fn, *args, **kwargs):
"""
See :meth:`CallChain.call`.
"""
return self.default_call_chain.call(fn, *args, **kwargs)
def call_no_reply(self, fn, *args, **kwargs):
"""
See :meth:`CallChain.call_no_reply`.
"""
self.default_call_chain.call_no_reply(fn, *args, **kwargs)
def shutdown(self, wait=False):
"""
Arrange for the context to receive a ``SHUTDOWN`` message, triggering
graceful shutdown.
Due to a lack of support for timers, no attempt is made yet to force
terminate a hung context using this method. This will be fixed shortly.
:param bool wait:
If :data:`True`, block the calling thread until the context has
completely terminated.
:returns:
If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch`
whose :meth:`get() <mitogen.core.Latch.get>` method returns
:data:`None` when shutdown completes. The `timeout` parameter may
be used to implement graceful timeouts.
"""
LOG.debug('%r.shutdown() sending SHUTDOWN', self)
latch = mitogen.core.Latch()
mitogen.core.listen(self, 'disconnect', lambda: latch.put(None))
@ -1666,6 +1701,13 @@ class Router(mitogen.core.Router):
msg.reply(None)
def add_route(self, target_id, stream):
"""
Arrange for messages whose `dst_id` is `target_id` to be forwarded on
the directly connected stream for `via_id`. This method is called
automatically in response to :data:`mitogen.core.ADD_ROUTE` messages,
but remains public while the design has not yet settled, and situations
may arise where routing is not fully automatic.
"""
LOG.debug('%r.add_route(%r, %r)', self, target_id, stream)
assert isinstance(target_id, int)
assert isinstance(stream, Stream)

@ -34,11 +34,57 @@ class Error(mitogen.core.Error):
class Select(object):
notify = None
"""
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances
and returns the first value posted to any receiver or select.
@classmethod
def all(cls, receivers):
return list(msg.unpickle() for msg in cls(receivers))
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print('Got %s from %s' % (msg, msg.receiver))
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print('Received total %s from %s receivers' % (total, len(recvs)))
:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.select.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work()),
mitogen.select.Select([
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work())
])
]
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
"""
notify = None
def __init__(self, receivers=(), oneshot=True):
self._receivers = []
@ -47,12 +93,46 @@ class Select(object):
for recv in receivers:
self.add(recv)
@classmethod
def all(cls, receivers):
"""
Take an iterable of receivers and retrieve a :class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch :meth:`Context.call_async
<mitogen.parent.Context.call_async>` invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.select.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
recvs = [c.call_async(get_disk_usage) for c in contexts]
sum(recv.get().unpickle() for recv in recvs)
Result processing happens in the order results arrive, rather than the
order requests were issued, so :meth:`all` should always be faster.
"""
return list(msg.unpickle() for msg in cls(receivers))
def _put(self, value):
self._latch.put(value)
if self.notify:
self.notify(self)
def __bool__(self):
"""
Return :data:`True` if any receivers are registered with this select.
"""
return bool(self._receivers)
def __enter__(self):
@ -62,6 +142,11 @@ class Select(object):
self.close()
def __iter__(self):
"""
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :meth:`remove`.
"""
while self._receivers:
yield self.get()
@ -80,6 +165,14 @@ class Select(object):
owned_msg = 'Cannot add: Receiver is already owned by another Select'
def add(self, recv):
"""
Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the
select.
:raises mitogen.select.Error:
An attempt was made to add a :class:`Select` to which this select
is indirectly a member of.
"""
if isinstance(recv, Select):
recv._check_no_loop(self)
@ -95,6 +188,12 @@ class Select(object):
not_present_msg = 'Instance is not a member of this Select'
def remove(self, recv):
"""
Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv`
from the select. Note that if the receiver has notified prior to
:meth:`remove`, it will still be returned by a subsequent :meth:`get`.
This may change in a future version.
"""
try:
if recv.notify != self._put:
raise ValueError
@ -104,16 +203,59 @@ class Select(object):
raise Error(self.not_present_msg)
def close(self):
"""
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :meth:`get` to be woken with
:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
"""
for recv in self._receivers[:]:
self.remove(recv)
self._latch.close()
def empty(self):
"""
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even though
a subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`empty` and :meth:`get`.
:meth:`empty` may return :data:`False` even when :meth:`get` would
block if another thread has drained a receiver added to this select.
This can be avoided by only consuming each receiver from a single
thread.
"""
return self._latch.empty()
empty_msg = 'Cannot get(), Select instance is empty'
def get(self, timeout=None, block=True):
"""
Fetch the next available value from any receiver, or raise
:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :data:`False`, immediately raise
:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:meth:`close` has been called, and the underlying latch is no
longer valid.
"""
if not self._receivers:
raise Error(self.empty_msg)

@ -46,6 +46,11 @@ else:
def disable_site_packages():
"""
Remove all entries mentioning ``site-packages`` or ``Extras`` from
:attr:sys.path. Used primarily for testing on OS X within a virtualenv,
where OS X bundles some ancient version of the :mod:`six` module.
"""
for entry in sys.path[:]:
if 'site-packages' in entry or 'Extras' in entry:
sys.path.remove(entry)
@ -65,6 +70,26 @@ def log_get_formatter():
def log_to_file(path=None, io=False, level='INFO'):
"""
Install a new :class:`logging.Handler` writing applications logs to the
filesystem. Useful when debugging slave IO problems.
Parameters to this function may be overridden at runtime using environment
variables. See :ref:`logging-env-vars`.
:param str path:
If not :data:`None`, a filesystem path to write logs to. Otherwise,
logs are written to :data:`sys.stderr`.
:param bool io:
If :data:`True`, include extremely verbose IO logs in the output.
Useful for debugging hangs, less useful for debugging application code.
:param str level:
Name of the :mod:`logging` package constant that is the minimum level
to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``, and
``ERROR``.
"""
log = logging.getLogger('')
if path:
fp = open(path, 'w', 1)
@ -94,6 +119,14 @@ def log_to_file(path=None, io=False, level='INFO'):
def run_with_router(func, *args, **kwargs):
"""
Arrange for `func(router, \*args, \**kwargs)` to run with a temporary
:class:`mitogen.master.Router`, ensuring the Router and Broker are
correctly shut down during normal or exceptional return.
:returns:
`func`'s return value.
"""
broker = mitogen.master.Broker()
router = mitogen.master.Router(broker)
try:
@ -104,6 +137,17 @@ def run_with_router(func, *args, **kwargs):
def with_router(func):
"""
Decorator version of :func:`run_with_router`. Example:
.. code-block:: python
@with_router
def do_stuff(router, arg):
pass
do_stuff(blah, 123)
"""
def wrapper(*args, **kwargs):
return run_with_router(func, *args, **kwargs)
if mitogen.core.PY3:
@ -122,7 +166,27 @@ PASSTHROUGH = (
mitogen.core.Secret,
)
def cast(obj):
"""
Many tools love to subclass built-in types in order to implement useful
functionality, such as annotating the safety of a Unicode string, or adding
additional methods to a dict. However, cPickle loves to preserve those
subtypes during serialization, resulting in CallError during :meth:`call
<mitogen.parent.Context.call>` in the target when it tries to deserialize
the data.
This function walks the object graph `obj`, producing a copy with any
custom sub-types removed. The functionality is not default since the
resulting walk may be computationally expensive given a large enough graph.
See :ref:`serialization-rules` for a list of supported types.
:param obj:
Object to undecorate.
:returns:
Undecorated object.
"""
if isinstance(obj, dict):
return dict((cast(k), cast(v)) for k, v in iteritems(obj))
if isinstance(obj, (list, tuple)):

Loading…
Cancel
Save