Merge remote-tracking branch 'origin/dmw'

- extra minify tests
- more inline docs migration
- issue #400
issue260
David Wilson 6 years ago
commit 6bf4d79a54

@ -28,12 +28,41 @@
from __future__ import absolute_import from __future__ import absolute_import
import os import os
import threading
import ansible_mitogen.loaders import ansible_mitogen.loaders
import ansible_mitogen.mixins import ansible_mitogen.mixins
import ansible_mitogen.process import ansible_mitogen.process
def _patch_awx_callback():
"""
issue #400: AWX loads a display callback that suffers from thread-safety
issues. Detect the presence of older AWX versions and patch the bug.
"""
# AWX uses sitecustomize.py to force-load this package. If it exists, we're
# running under AWX.
try:
from awx_display_callback.events import EventContext
from awx_display_callback.events import event_context
except ImportError:
return
if hasattr(EventContext(), '_local'):
# Patched version.
return
def patch_add_local(self, **kwargs):
tls = vars(self._local)
ctx = tls.setdefault('_ctx', {})
ctx.update(kwargs)
EventContext._local = threading.local()
EventContext.add_local = patch_add_local
_patch_awx_callback()
def wrap_action_loader__get(name, *args, **kwargs): def wrap_action_loader__get(name, *args, **kwargs):
""" """
While the mitogen strategy is active, trap action_loader.get() calls, While the mitogen strategy is active, trap action_loader.get() calls,

@ -84,237 +84,16 @@ Message Class
============= =============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Message
.. class:: Message :members:
Messages are the fundamental unit of communication, comprising fields from
the :ref:`stream-protocol` header, an optional reference to the receiving
:class:`mitogen.core.Router` for ingress messages, and helper methods for
deserialization and generating replies.
.. attribute:: router
The :class:`mitogen.core.Router` responsible for routing the
message. This is :data:`None` for locally originated messages.
.. attribute:: receiver
The :class:`mitogen.core.Receiver` over which the message was last
received. Part of the :class:`mitogen.select.Select` interface.
Defaults to :data:`None`.
.. attribute:: dst_id
Integer target context ID. :class:`mitogen.core.Router` delivers
messages locally when their :attr:`dst_id` matches
:data:`mitogen.context_id`, otherwise they are routed up or downstream.
.. attribute:: src_id
Integer source context ID. Used as the target of replies if any are
generated.
.. attribute:: auth_id
The context ID under whose authority the message is acting. See
:ref:`source-verification`.
.. attribute:: handle
Integer target handle in the destination context. This is one of the
:ref:`standard-handles`, or a dynamically generated handle used to
receive a one-time reply, such as the return value of a function call.
.. attribute:: reply_to
Integer target handle to direct any reply to this message. Used to
receive a one-time reply, such as the return value of a function call.
:data:`IS_DEAD` has a special meaning when it appears in this field.
.. attribute:: data
Message data, which may be raw or pickled.
.. attribute:: is_dead
:data:`True` if :attr:`reply_to` is set to the magic value
:data:`mitogen.core.IS_DEAD`, indicating the sender considers the
channel dead.
.. py:method:: __init__ (\**kwargs)
Construct a message from from the supplied `kwargs`. :attr:`src_id`
and :attr:`auth_id` are always set to :data:`mitogen.context_id`.
.. py:classmethod:: pickled (obj, \**kwargs)
Construct a pickled message, setting :attr:`data` to the
serialization of `obj`, and setting remaining fields using `kwargs`.
:returns:
The new message.
.. method:: unpickle (throw=True)
Unpickle :attr:`data`, optionally raising any exceptions present.
:param bool throw:
If :data:`True`, raise exceptions, otherwise it is the caller's
responsibility.
:raises mitogen.core.CallError:
The serialized data contained CallError exception.
:raises mitogen.core.ChannelError:
The `is_dead` field was set.
.. method:: reply (obj, router=None, \**kwargs)
Compose a reply to this message and send it using :attr:`router`, or
`router` is :attr:`router` is :data:`None`.
:param obj:
Either a :class:`Message`, or an object to be serialized in order
to construct a new message.
:param router:
Optional router to use if :attr:`router` is :data:`None`.
:param kwargs:
Optional keyword parameters overriding message fields in the reply.
Router Class Router Class
============ ============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Router
.. class:: Router :members:
Route messages between parent and child contexts, and invoke handlers
defined on our parent context. :meth:`Router.route() <route>` straddles
the :class:`Broker <mitogen.core.Broker>` and user threads, it is safe
to call anywhere.
**Note:** This is the somewhat limited core version of the Router class
used by child contexts. The master subclass is documented below this one.
.. attribute:: unidirectional
When :data:`True`, permit children to only communicate with the current
context or a parent of the current context. Routing between siblings or
children of parents is prohibited, ensuring no communication is
possible between intentionally partitioned networks, such as when a
program simultaneously manipulates hosts spread across a corporate and
a production network, or production networks that are otherwise
air-gapped.
Sending a prohibited message causes an error to be logged and a dead
message to be sent in reply to the errant message, if that message has
``reply_to`` set.
The value of :data:`unidirectional` becomes the default for the
:meth:`local() <mitogen.master.Router.local>` `unidirectional`
parameter.
.. method:: stream_by_id (dst_id)
Return the :class:`mitogen.core.Stream` that should be used to
communicate with `dst_id`. If a specific route for `dst_id` is not
known, a reference to the parent context's stream is returned.
.. method:: add_route (target_id, via_id)
Arrange for messages whose `dst_id` is `target_id` to be forwarded on
the directly connected stream for `via_id`. This method is called
automatically in response to ``ADD_ROUTE`` messages, but remains public
for now while the design has not yet settled, and situations may arise
where routing is not fully automatic.
.. method:: register (context, stream)
Register a new context and its associated stream, and add the stream's
receive side to the I/O multiplexer. This This method remains public
for now while hte design has not yet settled.
.. method:: add_handler (fn, handle=None, persist=True, respondent=None, policy=None)
Invoke `fn(msg)` for each Message sent to `handle` from this context.
Unregister after one invocation if `persist` is :data:`False`. If
`handle` is :data:`None`, a new handle is allocated and returned.
:param int handle:
If not :data:`None`, an explicit handle to register, usually one of
the ``mitogen.core.*`` constants. If unspecified, a new unused
handle will be allocated.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param mitogen.core.Context respondent:
Context that messages to this handle are expected to be sent from.
If specified, arranges for a dead message to be delivered to `fn`
when disconnection of the context is detected.
In future `respondent` will likely also be used to prevent other
contexts from sending messages to the handle.
:param function policy:
Function invoked as `policy(msg, stream)` where `msg` is a
:class:`mitogen.core.Message` about to be delivered, and
`stream` is the :class:`mitogen.core.Stream` on which it was
received. The function must return :data:`True`, otherwise an
error is logged and delivery is refused.
Two built-in policy functions exist:
* :func:`mitogen.core.has_parent_authority`: requires the
message arrived from a parent context, or a context acting with a
parent context's authority (``auth_id``).
* :func:`mitogen.parent.is_immediate_child`: requires the
message arrived from an immediately connected child, for use in
messaging patterns where either something becomes buggy or
insecure by permitting indirect upstream communication.
In case of refusal, and the message's ``reply_to`` field is
nonzero, a :class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:return:
`handle`, or if `handle` was :data:`None`, the newly allocated
handle.
.. method:: del_handler (handle)
Remove the handle registered for `handle`
:raises KeyError:
The handle wasn't registered.
.. method:: _async_route(msg, stream=None)
Arrange for `msg` to be forwarded towards its destination. If its
destination is the local context, then arrange for it to be dispatched
using the local handlers.
This is a lower overhead version of :meth:`route` that may only be
called from the I/O multiplexer thread.
:param mitogen.core.Stream stream:
If not :data:`None`, a reference to the stream the message arrived
on. Used for performing source route verification, to ensure
sensitive messages such as ``CALL_FUNCTION`` arrive only from
trusted contexts.
.. method:: route(msg)
Arrange for the :class:`Message` `msg` to be delivered to its
destination using any relevant downstream context, or if none is found,
by forwarding the message upstream towards the master context. If `msg`
is destined for the local context, it is dispatched using the handles
registered with :meth:`add_handler`.
This may be called from any thread.
.. currentmodule:: mitogen.master .. currentmodule:: mitogen.master
@ -362,11 +141,6 @@ Router Class
``ALLOCATE_ID`` message that causes the master to emit matching ``ALLOCATE_ID`` message that causes the master to emit matching
``ADD_ROUTE`` messages prior to replying. ``ADD_ROUTE`` messages prior to replying.
.. method:: context_by_id (context_id, via_id=None)
Messy factory/lookup function to find a context by its ID, or construct
it. In future this will be replaced by a much more sensible interface.
.. _context-factories: .. _context-factories:
**Context Factories** **Context Factories**
@ -803,53 +577,8 @@ Context Class
============= =============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Context
.. class:: Context :members:
Represent a remote context regardless of connection method.
**Note:** This is the somewhat limited core version of the Context class
used by child contexts. The master subclass is documented below this one.
.. method:: send (msg)
Arrange for `msg` to be delivered to this context.
:attr:`dst_id <Message.dst_id>` is set to the target context ID.
:param mitogen.core.Message msg:
The message.
.. method:: send_async (msg, persist=False)
Arrange for `msg` to be delivered to this context, with replies
directed to a newly constructed receiver. :attr:`dst_id
<Message.dst_id>` is set to the target context ID, and :attr:`reply_to
<Message.reply_to>` is set to the newly constructed receiver's handle.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param mitogen.core.Message msg:
The message.
:returns:
:class:`mitogen.core.Receiver` configured to receive any replies
sent to the message's `reply_to` handle.
.. method:: send_await (msg, deadline=None)
Like :meth:`send_async`, but expect a single reply (`persist=False`)
delivered within `deadline` seconds.
:param mitogen.core.Message msg:
The message.
:param float deadline:
If not :data:`None`, seconds before timing out waiting for a reply.
:returns:
The deserialized reply.
:raises mitogen.core.TimeoutError:
No message was received and `deadline` passed.
.. currentmodule:: mitogen.parent .. currentmodule:: mitogen.parent
@ -857,340 +586,33 @@ Context Class
.. autoclass:: CallChain .. autoclass:: CallChain
:members: :members:
.. class:: Context .. autoclass:: Context
:members:
Extend :class:`mitogen.core.Context` with functionality useful to masters,
and child contexts who later become parents. Currently when this class is
required, the target context's router is upgraded at runtime.
.. attribute:: default_call_chain
A :class:`CallChain` instance constructed by default, with pipelining
disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply`
use this instance.
.. method:: shutdown (wait=False)
Arrange for the context to receive a ``SHUTDOWN`` message, triggering
graceful shutdown.
Due to a lack of support for timers, no attempt is made yet to force
terminate a hung context using this method. This will be fixed shortly.
:param bool wait:
If :data:`True`, block the calling thread until the context has
completely terminated.
:returns:
If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch`
whose :meth:`get() <mitogen.core.Latch.get>` method returns
:data:`None` when shutdown completes. The `timeout` parameter may
be used to implement graceful timeouts.
.. method:: call_async (fn, \*args, \*\*kwargs)
See :meth:`CallChain.call_async`.
.. method:: call (fn, \*args, \*\*kwargs)
See :meth:`CallChain.call`.
.. method:: call_no_reply (fn, \*args, \*\*kwargs)
See :meth:`CallChain.call_no_reply`.
Receiver Class Receiver Class
============== ==============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Receiver
.. class:: Receiver (router, handle=None, persist=True, respondent=None) :members:
Receivers are used to wait for pickled responses from another context to be
sent to a handle registered in this context. A receiver may be single-use
(as in the case of :meth:`mitogen.parent.Context.call_async`) or
multiple use.
:param mitogen.core.Router router:
Router to register the handler on.
:param int handle:
If not :data:`None`, an explicit handle to register, otherwise an
unused handle is chosen.
:param bool persist:
If :data:`True`, do not unregister the receiver's handler after the
first message.
:param mitogen.core.Context respondent:
Reference to the context this receiver is receiving from. If not
:data:`None`, arranges for the receiver to receive a dead message if
messages can no longer be routed to the context, due to disconnection
or exit.
.. attribute:: notify = None
If not :data:`None`, a reference to a function invoked as
`notify(receiver)` when a new message is delivered to this receiver.
Used by :class:`mitogen.select.Select` to implement waiting on
multiple receivers.
.. py:method:: to_sender ()
Return a :class:`mitogen.core.Sender` configured to deliver messages
to this receiver. Since a Sender can be serialized, this makes it
convenient to pass `(context_id, handle)` pairs around::
def deliver_monthly_report(sender):
for line in open('monthly_report.txt'):
sender.send(line)
sender.close()
remote = router.ssh(hostname='mainframe')
recv = mitogen.core.Receiver(router)
remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv:
print(msg)
.. py:method:: empty ()
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :meth:`empty` and
:meth:`get`.
:meth:`empty` is only useful to avoid a race while installing
:attr:`notify`:
.. code-block:: python
recv.notify = _my_notify_function
if not recv.empty():
_my_notify_function(recv)
# It is guaranteed the receiver was empty after the notification
# function was installed, or that it was non-empty and the
# notification function was invoked at least once.
.. py:method:: close ()
Cause :class:`mitogen.core.ChannelError` to be raised in any thread
waiting in :meth:`get` on this receiver.
.. py:method:: get (timeout=None)
Sleep waiting for a message to arrive on this receiver.
:param float timeout:
If not :data:`None`, specifies a timeout in seconds.
:raises mitogen.core.ChannelError:
The remote end indicated the channel should be closed, or
communication with its parent context was lost.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:returns:
`(msg, data)` tuple, where `msg` is the
:class:`mitogen.core.Message` that was received, and `data` is
its unpickled data part.
.. py:method:: get_data (timeout=None)
Like :meth:`get`, except only return the data part.
.. py:method:: __iter__ ()
Block and yield `(msg, data)` pairs delivered to this receiver until
:class:`mitogen.core.ChannelError` is raised.
Sender Class Sender Class
============ ============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. autoclass:: Sender
.. class:: Sender (context, dst_handle) :members:
Senders are used to send pickled messages to a handle in another context,
it is the inverse of :class:`mitogen.core.Sender`.
Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information.
:param mitogen.core.Context context:
Context to send messages to.
:param int dst_handle:
Destination handle to send messages to.
.. py:method:: close ()
Send a dead message to the remote end, causing :meth:`ChannelError`
to be raised in any waiting thread.
.. py:method:: send (data)
Send `data` to the remote end.
Select Class Select Class
============ ============
.. module:: mitogen.select .. module:: mitogen.select
.. currentmodule:: mitogen.select .. currentmodule:: mitogen.select
.. autoclass:: Select
.. class:: Select (receivers=(), oneshot=True) :members:
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print('Got %s from %s' % (msg, msg.receiver))
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print('Received total %s from %s receivers' % (total, len(recvs)))
:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.select.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work()),
mitogen.select.Select([
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work())
])
]
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
.. py:classmethod:: all (it)
Take an iterable of receivers and retrieve a :class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch
:meth:`Context.call_async <mitogen.parent.Context.call_async>`
invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.select.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
recvs = [c.call_async(get_disk_usage) for c in contexts]
sum(recv.get().unpickle() for recv in recvs)
Result processing happens in the order results arrive, rather than the
order requests were issued, so :meth:`all` should always be faster.
.. py:method:: get (timeout=None, block=True)
Fetch the next available value from any receiver, or raise
:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :data:`False`, immediately raise
:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:meth:`close` has been called, and the underlying latch is no
longer valid.
.. py:method:: __bool__ ()
Return :data:`True` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :meth:`get` to be woken with
:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
.. py:method:: empty ()
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :meth:`get` will succeed, since a
message may be posted at any moment between :meth:`empty` and
:meth:`get`.
:meth:`empty` may return :data:`False` even when :meth:`get`
would block if another thread has drained a receiver added to this
select. This can be avoided by only consuming each receiver from a
single thread.
.. py:method:: __iter__ (self)
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :meth:`remove`.
.. py:method:: add (recv)
Add the :class:`mitogen.core.Receiver` or
:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :class:`mitogen.core.Receiver` or
:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :meth:`remove`, then it will
still be returned by a subsequent :meth:`get`. This may change in a
future version.
Channel Class Channel Class
@ -1327,64 +749,13 @@ Utility Functions
A random assortment of utility functions useful on masters and children. A random assortment of utility functions useful on masters and children.
.. currentmodule:: mitogen.utils .. currentmodule:: mitogen.utils
.. function:: cast (obj) .. autofunction:: cast
Many tools love to subclass built-in types in order to implement useful
functionality, such as annotating the safety of a Unicode string, or adding
additional methods to a dict. However, cPickle loves to preserve those
subtypes during serialization, resulting in CallError during :meth:`call
<mitogen.parent.Context.call>` in the target when it tries to deserialize
the data.
This function walks the object graph `obj`, producing a copy with any
custom sub-types removed. The functionality is not default since the
resulting walk may be computationally expensive given a large enough graph.
See :ref:`serialization-rules` for a list of supported types.
:param obj:
Object to undecorate.
:returns:
Undecorated object.
.. currentmodule:: mitogen.utils
.. function:: disable_site_packages
Remove all entries mentioning ``site-packages`` or ``Extras`` from the
system path. Used primarily for testing on OS X within a virtualenv, where
OS X bundles some ancient version of the :mod:`six` module.
.. currentmodule:: mitogen.utils .. currentmodule:: mitogen.utils
.. function:: log_to_file (path=None, io=False, level='INFO') .. autofunction:: disable_site_packages
.. autofunction:: log_to_file
Install a new :class:`logging.Handler` writing applications logs to the .. autofunction:: run_with_router(func, \*args, \**kwargs)
filesystem. Useful when debugging slave IO problems.
Parameters to this function may be overridden at runtime using environment
variables. See :ref:`logging-env-vars`.
:param str path:
If not :data:`None`, a filesystem path to write logs to. Otherwise,
logs are written to :data:`sys.stderr`.
:param bool io:
If :data:`True`, include extremely verbose IO logs in the output.
Useful for debugging hangs, less useful for debugging application code.
:param str level:
Name of the :mod:`logging` package constant that is the minimum
level to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``,
and ``ERROR``.
.. currentmodule:: mitogen.utils
.. function:: run_with_router(func, \*args, \**kwargs)
Arrange for `func(router, \*args, \**kwargs)` to run with a temporary
:class:`mitogen.master.Router`, ensuring the Router and Broker are
correctly shut down during normal or exceptional return.
:returns:
`func`'s return value.
.. currentmodule:: mitogen.utils .. currentmodule:: mitogen.utils
.. decorator:: with_router .. decorator:: with_router

@ -24,6 +24,12 @@ Mitogen for Ansible
Enhancements Enhancements
^^^^^^^^^^^^ ^^^^^^^^^^^^
* `#76 <https://github.com/dw/mitogen/issues/76>`_: disconnect propagation has
improved, allowing Ansible to cancel waits for responses from targets that
where abruptly disconnected. This increases the chance a task will fail
gracefully, rather than hanging due to the connection being severed, for
example because of network failure or EC2 instance maintenance.
* `#369 <https://github.com/dw/mitogen/issues/369>`_: :meth:`Connection.reset` * `#369 <https://github.com/dw/mitogen/issues/369>`_: :meth:`Connection.reset`
is implemented, allowing `meta: reset_connection is implemented, allowing `meta: reset_connection
<https://docs.ansible.com/ansible/latest/modules/meta_module.html>`_ to shut <https://docs.ansible.com/ansible/latest/modules/meta_module.html>`_ to shut
@ -50,6 +56,9 @@ Fixes
now print a useful hint when Python fails to start, as no useful error is now print a useful hint when Python fails to start, as no useful error is
normally logged to the console by these tools. normally logged to the console by these tools.
* `#400 <https://github.com/dw/mitogen/issues/400>`_: work around a threading
bug in the AWX display callback when running with high verbosity setting.
* `#409 <https://github.com/dw/mitogen/issues/409>`_: the setns method was * `#409 <https://github.com/dw/mitogen/issues/409>`_: the setns method was
silently broken due to missing tests. Basic coverage was added to prevent a silently broken due to missing tests. Basic coverage was added to prevent a
recurrence. recurrence.
@ -100,6 +109,7 @@ Mitogen would not be possible without the support of users. A huge thanks for
bug reports, features and fixes in this release contributed by bug reports, features and fixes in this release contributed by
`Brian Candler <https://github.com/candlerb>`_, `Brian Candler <https://github.com/candlerb>`_,
`Guy Knights <https://github.com/knightsg>`_, `Guy Knights <https://github.com/knightsg>`_,
`Jiří Vávra <https://github.com/Houbovo>`_,
`Jonathan Rosser <https://github.com/jrosser>`_, and `Jonathan Rosser <https://github.com/jrosser>`_, and
`Mehdi <https://github.com/mehdisat7>`_. `Mehdi <https://github.com/mehdisat7>`_.

@ -500,18 +500,53 @@ else:
class Message(object): class Message(object):
"""
Messages are the fundamental unit of communication, comprising fields from
the :ref:`stream-protocol` header, an optional reference to the receiving
:class:`mitogen.core.Router` for ingress messages, and helper methods for
deserialization and generating replies.
"""
#: Integer target context ID. :class:`Router` delivers messages locally
#: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise
#: they are routed up or downstream.
dst_id = None dst_id = None
#: Integer source context ID. Used as the target of replies if any are
#: generated.
src_id = None src_id = None
#: Context ID under whose authority the message is acting. See
#: :ref:`source-verification`.
auth_id = None auth_id = None
#: Integer target handle in the destination context. This is one of the
#: :ref:`standard-handles`, or a dynamically generated handle used to
#: receive a one-time reply, such as the return value of a function call.
handle = None handle = None
#: Integer target handle to direct any reply to this message. Used to
#: receive a one-time reply, such as the return value of a function call.
#: :data:`IS_DEAD` has a special meaning when it appears in this field.
reply_to = None reply_to = None
#: Raw message data bytes.
data = b('') data = b('')
_unpickled = object() _unpickled = object()
#: The :class:`Router` responsible for routing the message. This is
#: :data:`None` for locally originated messages.
router = None router = None
#: The :class:`Receiver` over which the message was last received. Part of
#: the :class:`mitogen.select.Select` interface. Defaults to :data:`None`.
receiver = None receiver = None
def __init__(self, **kwargs): def __init__(self, **kwargs):
"""
Construct a message from from the supplied `kwargs`. :attr:`src_id` and
:attr:`auth_id` are always set to :data:`mitogen.context_id`.
"""
self.src_id = mitogen.context_id self.src_id = mitogen.context_id
self.auth_id = mitogen.context_id self.auth_id = mitogen.context_id
vars(self).update(kwargs) vars(self).update(kwargs)
@ -551,14 +586,28 @@ class Message(object):
@property @property
def is_dead(self): def is_dead(self):
"""
:data:`True` if :attr:`reply_to` is set to the magic value
:data:`IS_DEAD`, indicating the sender considers the channel dead.
"""
return self.reply_to == IS_DEAD return self.reply_to == IS_DEAD
@classmethod @classmethod
def dead(cls, **kwargs): def dead(cls, **kwargs):
"""
Syntax helper to construct a dead message.
"""
return cls(reply_to=IS_DEAD, **kwargs) return cls(reply_to=IS_DEAD, **kwargs)
@classmethod @classmethod
def pickled(cls, obj, **kwargs): def pickled(cls, obj, **kwargs):
"""
Construct a pickled message, setting :attr:`data` to the serialization
of `obj`, and setting remaining fields using `kwargs`.
:returns:
The new message.
"""
self = cls(**kwargs) self = cls(**kwargs)
try: try:
self.data = pickle.dumps(obj, protocol=2) self.data = pickle.dumps(obj, protocol=2)
@ -568,6 +617,18 @@ class Message(object):
return self return self
def reply(self, msg, router=None, **kwargs): def reply(self, msg, router=None, **kwargs):
"""
Compose a reply to this message and send it using :attr:`router`, or
`router` is :attr:`router` is :data:`None`.
:param obj:
Either a :class:`Message`, or an object to be serialized in order
to construct a new message.
:param router:
Optional router to use if :attr:`router` is :data:`None`.
:param kwargs:
Optional keyword parameters overriding message fields in the reply.
"""
if not isinstance(msg, Message): if not isinstance(msg, Message):
msg = Message.pickled(msg) msg = Message.pickled(msg)
msg.dst_id = self.src_id msg.dst_id = self.src_id
@ -584,7 +645,18 @@ class Message(object):
UNPICKLER_KWARGS = {} UNPICKLER_KWARGS = {}
def unpickle(self, throw=True, throw_dead=True): def unpickle(self, throw=True, throw_dead=True):
"""Deserialize `data` into an object.""" """
Unpickle :attr:`data`, optionally raising any exceptions present.
:param bool throw_dead:
If :data:`True`, raise exceptions, otherwise it is the caller's
responsibility.
:raises CallError:
The serialized data contained CallError exception.
:raises ChannelError:
The `is_dead` field was set.
"""
_vv and IOLOG.debug('%r.unpickle()', self) _vv and IOLOG.debug('%r.unpickle()', self)
if throw_dead and self.is_dead: if throw_dead and self.is_dead:
raise ChannelError(ChannelError.remote_msg) raise ChannelError(ChannelError.remote_msg)
@ -616,25 +688,42 @@ class Message(object):
class Sender(object): class Sender(object):
"""
Senders are used to send pickled messages to a handle in another context,
it is the inverse of :class:`mitogen.core.Sender`.
Senders may be serialized, making them convenient to wire up data flows.
See :meth:`mitogen.core.Receiver.to_sender` for more information.
:param Context context:
Context to send messages to.
:param int dst_handle:
Destination handle to send messages to.
"""
def __init__(self, context, dst_handle): def __init__(self, context, dst_handle):
self.context = context self.context = context
self.dst_handle = dst_handle self.dst_handle = dst_handle
def __repr__(self): def send(self, data):
return 'Sender(%r, %r)' % (self.context, self.dst_handle) """
Send `data` to the remote end.
def __reduce__(self): """
return _unpickle_sender, (self.context.context_id, self.dst_handle) _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle))
def close(self): def close(self):
"""Indicate this channel is closed to the remote side.""" """
Send a dead message to the remote, causing :meth:`ChannelError` to be
raised in any waiting thread.
"""
_vv and IOLOG.debug('%r.close()', self) _vv and IOLOG.debug('%r.close()', self)
self.context.send(Message.dead(handle=self.dst_handle)) self.context.send(Message.dead(handle=self.dst_handle))
def send(self, data): def __repr__(self):
"""Send `data` to the remote.""" return 'Sender(%r, %r)' % (self.context, self.dst_handle)
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(Message.pickled(data, handle=self.dst_handle)) def __reduce__(self):
return _unpickle_sender, (self.context.context_id, self.dst_handle)
def _unpickle_sender(router, context_id, dst_handle): def _unpickle_sender(router, context_id, dst_handle):
@ -646,12 +735,39 @@ def _unpickle_sender(router, context_id, dst_handle):
class Receiver(object): class Receiver(object):
"""
Receivers maintain a thread-safe queue of messages sent to a handle of this
context from another context.
:param mitogen.core.Router router:
Router to register the handler on.
:param int handle:
If not :data:`None`, an explicit handle to register, otherwise an
unused handle is chosen.
:param bool persist:
If :data:`False`, unregister the handler after one message is received.
Single-message receivers are intended for RPC-like transactions, such
as in the case of :meth:`mitogen.parent.Context.call_async`.
:param mitogen.core.Context respondent:
Context this receiver is receiving from. If not :data:`None`, arranges
for the receiver to receive a dead message if messages can no longer be
routed to the context, due to disconnection or exit.
"""
#: If not :data:`None`, a reference to a function invoked as
#: `notify(receiver)` when a new message is delivered to this receiver.
#: Used by :class:`mitogen.select.Select` to implement waiting on multiple
#: receivers.
notify = None notify = None
raise_channelerror = True raise_channelerror = True
def __init__(self, router, handle=None, persist=True, def __init__(self, router, handle=None, persist=True,
respondent=None, policy=None): respondent=None, policy=None):
self.router = router self.router = router
#: The handle.
self.handle = handle # Avoid __repr__ crash in add_handler() self.handle = handle # Avoid __repr__ crash in add_handler()
self._latch = Latch() # Must exist prior to .add_handler() self._latch = Latch() # Must exist prior to .add_handler()
self.handle = router.add_handler( self.handle = router.add_handler(
@ -666,26 +782,73 @@ class Receiver(object):
return 'Receiver(%r, %r)' % (self.router, self.handle) return 'Receiver(%r, %r)' % (self.router, self.handle)
def to_sender(self): def to_sender(self):
"""
Return a :class:`Sender` configured to deliver messages to this
receiver. As senders are serializable, this makes it convenient to pass
`(context_id, handle)` pairs around::
def deliver_monthly_report(sender):
for line in open('monthly_report.txt'):
sender.send(line)
sender.close()
remote = router.ssh(hostname='mainframe')
recv = mitogen.core.Receiver(router)
remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv:
print(msg)
"""
context = Context(self.router, mitogen.context_id) context = Context(self.router, mitogen.context_id)
return Sender(context, self.handle) return Sender(context, self.handle)
def _on_receive(self, msg): def _on_receive(self, msg):
"""Callback from the Stream; appends data to the internal queue.""" """
Callback from the Stream; appends data to the internal queue.
"""
_vv and IOLOG.debug('%r._on_receive(%r)', self, msg) _vv and IOLOG.debug('%r._on_receive(%r)', self, msg)
self._latch.put(msg) self._latch.put(msg)
if self.notify: if self.notify:
self.notify(self) self.notify(self)
def close(self): def close(self):
"""
Unregister the receiver's handle from its associated router, and cause
:class:`ChannelError` to be raised in any thread waiting in :meth:`get`
on this receiver.
"""
if self.handle: if self.handle:
self.router.del_handler(self.handle) self.router.del_handler(self.handle)
self.handle = None self.handle = None
self._latch.put(Message.dead()) self._latch.put(Message.dead())
def empty(self): def empty(self):
"""
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even though
a subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`empty` and :meth:`get`.
"""
return self._latch.empty() return self._latch.empty()
def get(self, timeout=None, block=True, throw_dead=True): def get(self, timeout=None, block=True, throw_dead=True):
"""
Sleep waiting for a message to arrive on this receiver.
:param float timeout:
If not :data:`None`, specifies a timeout in seconds.
:raises mitogen.core.ChannelError:
The remote end indicated the channel should be closed, or
communication with its parent context was lost.
:raises mitogen.core.TimeoutError:
Timeout was reached.
:returns:
`(msg, data)` tuple, where `msg` is the :class:`Message` that was
received, and `data` is its unpickled data part.
"""
_vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block)
msg = self._latch.get(timeout=timeout, block=block) msg = self._latch.get(timeout=timeout, block=block)
if msg.is_dead and throw_dead: if msg.is_dead and throw_dead:
@ -696,6 +859,10 @@ class Receiver(object):
return msg return msg
def __iter__(self): def __iter__(self):
"""
Yield consecutive :class:`Message` instances delivered to this receiver
until :class:`ChannelError` is raised.
"""
while True: while True:
msg = self.get(throw_dead=False) msg = self.get(throw_dead=False)
if msg.is_dead: if msg.is_dead:
@ -1213,6 +1380,28 @@ class Stream(BasicStream):
class Context(object): class Context(object):
"""
Represent a remote context regardless of the underlying connection method.
Context objects are simple facades that emit messages through an
associated router, and have :ref:`signals` raised against them in response
to various events relating to the context.
**Note:** This is the somewhat limited core version, used by child
contexts. The master subclass is documented below this one.
Contexts maintain no internal state and are thread-safe.
Prefer :meth:`Router.context_by_id` over constructing context objects
explicitly, as that method is deduplicating, and returns the only context
instance :ref:`signals` will be raised on.
:param Router router:
Router to emit messages through.
:param int context_id:
Context ID.
:param str name:
Context name.
"""
remote_name = None remote_name = None
def __init__(self, router, context_id, name=None): def __init__(self, router, context_id, name=None):
@ -1231,6 +1420,23 @@ class Context(object):
fire(self, 'disconnect') fire(self, 'disconnect')
def send_async(self, msg, persist=False): def send_async(self, msg, persist=False):
"""
Arrange for `msg` to be delivered to this context, with replies
directed to a newly constructed receiver. :attr:`dst_id
<Message.dst_id>` is set to the target context ID, and :attr:`reply_to
<Message.reply_to>` is set to the newly constructed receiver's handle.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param mitogen.core.Message msg:
The message.
:returns:
:class:`Receiver` configured to receive any replies sent to the
message's `reply_to` handle.
"""
if self.router.broker._thread == threading.currentThread(): # TODO if self.router.broker._thread == threading.currentThread(): # TODO
raise SystemError('Cannot making blocking call on broker thread') raise SystemError('Cannot making blocking call on broker thread')
@ -1254,8 +1460,13 @@ class Context(object):
return self.send_async(msg) return self.send_async(msg)
def send(self, msg): def send(self, msg):
"""send `obj` to `handle`, and tell the broker we have output. May """
be called from any thread.""" Arrange for `msg` to be delivered to this context. :attr:`dst_id
<Message.dst_id>` is set to the target context ID.
:param Message msg:
Message.
"""
msg.dst_id = self.context_id msg.dst_id = self.context_id
self.router.route(msg) self.router.route(msg)
@ -1264,7 +1475,19 @@ class Context(object):
return recv.get().unpickle() return recv.get().unpickle()
def send_await(self, msg, deadline=None): def send_await(self, msg, deadline=None):
"""Send `msg` and wait for a response with an optional timeout.""" """
Like :meth:`send_async`, but expect a single reply (`persist=False`)
delivered within `deadline` seconds.
:param mitogen.core.Message msg:
The message.
:param float deadline:
If not :data:`None`, seconds before timing out waiting for a reply.
:returns:
Deserialized reply.
:raises TimeoutError:
No message was received and `deadline` passed.
"""
receiver = self.send_async(msg) receiver = self.send_async(msg)
response = receiver.get(deadline) response = receiver.get(deadline)
data = response.unpickle() data = response.unpickle()
@ -1710,8 +1933,33 @@ class IoLogger(BasicStream):
class Router(object): class Router(object):
"""
Route messages between contexts, and invoke local handlers for messages
addressed to this context. :meth:`Router.route() <route>` straddles the
:class:`Broker <mitogen.core.Broker>` and user threads, it is safe to call
anywhere.
**Note:** This is the somewhat limited core version of the Router class
used by child contexts. The master subclass is documented below this one.
"""
context_class = Context context_class = Context
max_message_size = 128 * 1048576 max_message_size = 128 * 1048576
#: When :data:`True`, permit children to only communicate with the current
#: context or a parent of the current context. Routing between siblings or
#: children of parents is prohibited, ensuring no communication is possible
#: between intentionally partitioned networks, such as when a program
#: simultaneously manipulates hosts spread across a corporate and a
#: production network, or production networks that are otherwise
#: air-gapped.
#:
#: Sending a prohibited message causes an error to be logged and a dead
#: message to be sent in reply to the errant message, if that message has
#: ``reply_to`` set.
#:
#: The value of :data:`unidirectional` becomes the default for the
#: :meth:`local() <mitogen.master.Router.local>` `unidirectional`
#: parameter.
unidirectional = False unidirectional = False
def __init__(self, broker): def __init__(self, broker):
@ -1751,7 +1999,7 @@ class Router(object):
fire(self._context_by_id[target_id], 'disconnect') fire(self._context_by_id[target_id], 'disconnect')
def on_stream_disconnect(self, stream): def _on_stream_disconnect(self, stream):
for context in self._context_by_id.values(): for context in self._context_by_id.values():
stream_ = self._stream_by_id.get(context.context_id) stream_ = self._stream_by_id.get(context.context_id)
if stream_ is stream: if stream_ is stream:
@ -1764,6 +2012,10 @@ class Router(object):
func(Message.dead()) func(Message.dead())
def context_by_id(self, context_id, via_id=None, create=True, name=None): def context_by_id(self, context_id, via_id=None, create=True, name=None):
"""
Messy factory/lookup function to find a context by its ID, or construct
it. In future this will be replaced by a much more sensible interface.
"""
context = self._context_by_id.get(context_id) context = self._context_by_id.get(context_id)
if create and not context: if create and not context:
context = self.context_class(self, context_id, name=name) context = self.context_class(self, context_id, name=name)
@ -1773,21 +2025,85 @@ class Router(object):
return context return context
def register(self, context, stream): def register(self, context, stream):
"""
Register a newly constructed context and its associated stream, and add
the stream's receive side to the I/O multiplexer. This method remains
public while the design has not yet settled.
"""
_v and LOG.debug('register(%r, %r)', context, stream) _v and LOG.debug('register(%r, %r)', context, stream)
self._stream_by_id[context.context_id] = stream self._stream_by_id[context.context_id] = stream
self._context_by_id[context.context_id] = context self._context_by_id[context.context_id] = context
self.broker.start_receive(stream) self.broker.start_receive(stream)
listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream)) listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream))
def stream_by_id(self, dst_id): def stream_by_id(self, dst_id):
"""
Return the :class:`Stream` that should be used to communicate with
`dst_id`. If a specific route for `dst_id` is not known, a reference to
the parent context's stream is returned.
"""
return self._stream_by_id.get(dst_id, return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id)) self._stream_by_id.get(mitogen.parent_id))
def del_handler(self, handle): def del_handler(self, handle):
"""
Remove the handle registered for `handle`
:raises KeyError:
The handle wasn't registered.
"""
del self._handle_map[handle] del self._handle_map[handle]
def add_handler(self, fn, handle=None, persist=True, def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None): policy=None, respondent=None):
"""
Invoke `fn(msg)` for each Message sent to `handle` from this context.
Unregister after one invocation if `persist` is :data:`False`. If
`handle` is :data:`None`, a new handle is allocated and returned.
:param int handle:
If not :data:`None`, an explicit handle to register, usually one of
the ``mitogen.core.*`` constants. If unspecified, a new unused
handle will be allocated.
:param bool persist:
If :data:`False`, the handler will be unregistered after a single
message has been received.
:param Context respondent:
Context that messages to this handle are expected to be sent from.
If specified, arranges for a dead message to be delivered to `fn`
when disconnection of the context is detected.
In future `respondent` will likely also be used to prevent other
contexts from sending messages to the handle.
:param function policy:
Function invoked as `policy(msg, stream)` where `msg` is a
:class:`mitogen.core.Message` about to be delivered, and `stream`
is the :class:`mitogen.core.Stream` on which it was received. The
function must return :data:`True`, otherwise an error is logged and
delivery is refused.
Two built-in policy functions exist:
* :func:`has_parent_authority`: requires the message arrived from a
parent context, or a context acting with a parent context's
authority (``auth_id``).
* :func:`mitogen.parent.is_immediate_child`: requires the
message arrived from an immediately connected child, for use in
messaging patterns where either something becomes buggy or
insecure by permitting indirect upstream communication.
In case of refusal, and the message's ``reply_to`` field is
nonzero, a :class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:return:
`handle`, or if `handle` was :data:`None`, the newly allocated
handle.
"""
handle = handle or next(self._last_handle) handle = handle or next(self._last_handle)
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
@ -1848,6 +2164,19 @@ class Router(object):
msg.reply(Message.dead(), router=self) msg.reply(Message.dead(), router=self)
def _async_route(self, msg, in_stream=None): def _async_route(self, msg, in_stream=None):
"""
Arrange for `msg` to be forwarded towards its destination. If its
destination is the local context, then arrange for it to be dispatched
using the local handlers.
This is a lower overhead version of :meth:`route` that may only be
called from the I/O multiplexer thread.
:param Stream in_stream:
If not :data:`None`, the stream the message arrived on. Used for
performing source route verification, to ensure sensitive messages
such as ``CALL_FUNCTION`` arrive only from trusted contexts.
"""
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream)
if len(msg.data) > self.max_message_size: if len(msg.data) > self.max_message_size:
@ -1902,6 +2231,15 @@ class Router(object):
out_stream._send(msg) out_stream._send(msg)
def route(self, msg): def route(self, msg):
"""
Arrange for the :class:`Message` `msg` to be delivered to its
destination using any relevant downstream context, or if none is found,
by forwarding the message upstream towards the master context. If `msg`
is destined for the local context, it is dispatched using the handles
registered with :meth:`add_handler`.
This may be called from any thread.
"""
self.broker.defer(self._async_route, msg) self.broker.defer(self._async_route, msg)

@ -1,4 +1,3 @@
# coding: utf-8
# Copyright 2018, Yannig Perré # Copyright 2018, Yannig Perré
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without

@ -618,6 +618,7 @@ class EofError(mitogen.core.StreamError):
the child process. the child process.
""" """
# inherits from StreamError to maintain compatibility. # inherits from StreamError to maintain compatibility.
pass
class Argv(object): class Argv(object):
@ -1390,7 +1391,16 @@ class CallChain(object):
class Context(mitogen.core.Context): class Context(mitogen.core.Context):
"""
Extend :class:`mitogen.core.Context` with functionality useful to masters,
and child contexts who later become parents. Currently when this class is
required, the target context's router is upgraded at runtime.
"""
#: A :class:`CallChain` instance constructed by default, with pipelining
#: disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` use
#: this instance.
call_chain_class = CallChain call_chain_class = CallChain
via = None via = None
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -1406,15 +1416,41 @@ class Context(mitogen.core.Context):
return hash((self.router, self.context_id)) return hash((self.router, self.context_id))
def call_async(self, fn, *args, **kwargs): def call_async(self, fn, *args, **kwargs):
"""
See :meth:`CallChain.call_async`.
"""
return self.default_call_chain.call_async(fn, *args, **kwargs) return self.default_call_chain.call_async(fn, *args, **kwargs)
def call(self, fn, *args, **kwargs): def call(self, fn, *args, **kwargs):
"""
See :meth:`CallChain.call`.
"""
return self.default_call_chain.call(fn, *args, **kwargs) return self.default_call_chain.call(fn, *args, **kwargs)
def call_no_reply(self, fn, *args, **kwargs): def call_no_reply(self, fn, *args, **kwargs):
"""
See :meth:`CallChain.call_no_reply`.
"""
self.default_call_chain.call_no_reply(fn, *args, **kwargs) self.default_call_chain.call_no_reply(fn, *args, **kwargs)
def shutdown(self, wait=False): def shutdown(self, wait=False):
"""
Arrange for the context to receive a ``SHUTDOWN`` message, triggering
graceful shutdown.
Due to a lack of support for timers, no attempt is made yet to force
terminate a hung context using this method. This will be fixed shortly.
:param bool wait:
If :data:`True`, block the calling thread until the context has
completely terminated.
:returns:
If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch`
whose :meth:`get() <mitogen.core.Latch.get>` method returns
:data:`None` when shutdown completes. The `timeout` parameter may
be used to implement graceful timeouts.
"""
LOG.debug('%r.shutdown() sending SHUTDOWN', self) LOG.debug('%r.shutdown() sending SHUTDOWN', self)
latch = mitogen.core.Latch() latch = mitogen.core.Latch()
mitogen.core.listen(self, 'disconnect', lambda: latch.put(None)) mitogen.core.listen(self, 'disconnect', lambda: latch.put(None))
@ -1666,6 +1702,13 @@ class Router(mitogen.core.Router):
msg.reply(None) msg.reply(None)
def add_route(self, target_id, stream): def add_route(self, target_id, stream):
"""
Arrange for messages whose `dst_id` is `target_id` to be forwarded on
the directly connected stream for `via_id`. This method is called
automatically in response to :data:`mitogen.core.ADD_ROUTE` messages,
but remains public while the design has not yet settled, and situations
may arise where routing is not fully automatic.
"""
LOG.debug('%r.add_route(%r, %r)', self, target_id, stream) LOG.debug('%r.add_route(%r, %r)', self, target_id, stream)
assert isinstance(target_id, int) assert isinstance(target_id, int)
assert isinstance(stream, Stream) assert isinstance(stream, Stream)

@ -34,11 +34,57 @@ class Error(mitogen.core.Error):
class Select(object): class Select(object):
notify = None """
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances
and returns the first value posted to any receiver or select.
@classmethod If `oneshot` is :data:`True`, then remove each receiver as it yields a
def all(cls, receivers): result; since :meth:`__iter__` terminates once the final receiver is
return list(msg.unpickle() for msg in cls(receivers)) removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print('Got %s from %s' % (msg, msg.receiver))
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print('Received total %s from %s receivers' % (total, len(recvs)))
:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.select.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work()),
mitogen.select.Select([
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work())
])
]
for msg in mitogen.select.Select(selects):
print(msg.unpickle())
"""
notify = None
def __init__(self, receivers=(), oneshot=True): def __init__(self, receivers=(), oneshot=True):
self._receivers = [] self._receivers = []
@ -47,12 +93,46 @@ class Select(object):
for recv in receivers: for recv in receivers:
self.add(recv) self.add(recv)
@classmethod
def all(cls, receivers):
"""
Take an iterable of receivers and retrieve a :class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch :meth:`Context.call_async
<mitogen.parent.Context.call_async>` invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.select.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
recvs = [c.call_async(get_disk_usage) for c in contexts]
sum(recv.get().unpickle() for recv in recvs)
Result processing happens in the order results arrive, rather than the
order requests were issued, so :meth:`all` should always be faster.
"""
return list(msg.unpickle() for msg in cls(receivers))
def _put(self, value): def _put(self, value):
self._latch.put(value) self._latch.put(value)
if self.notify: if self.notify:
self.notify(self) self.notify(self)
def __bool__(self): def __bool__(self):
"""
Return :data:`True` if any receivers are registered with this select.
"""
return bool(self._receivers) return bool(self._receivers)
def __enter__(self): def __enter__(self):
@ -62,6 +142,11 @@ class Select(object):
self.close() self.close()
def __iter__(self): def __iter__(self):
"""
Yield the result of :meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :meth:`remove`.
"""
while self._receivers: while self._receivers:
yield self.get() yield self.get()
@ -80,6 +165,14 @@ class Select(object):
owned_msg = 'Cannot add: Receiver is already owned by another Select' owned_msg = 'Cannot add: Receiver is already owned by another Select'
def add(self, recv): def add(self, recv):
"""
Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the
select.
:raises mitogen.select.Error:
An attempt was made to add a :class:`Select` to which this select
is indirectly a member of.
"""
if isinstance(recv, Select): if isinstance(recv, Select):
recv._check_no_loop(self) recv._check_no_loop(self)
@ -95,6 +188,12 @@ class Select(object):
not_present_msg = 'Instance is not a member of this Select' not_present_msg = 'Instance is not a member of this Select'
def remove(self, recv): def remove(self, recv):
"""
Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv`
from the select. Note that if the receiver has notified prior to
:meth:`remove`, it will still be returned by a subsequent :meth:`get`.
This may change in a future version.
"""
try: try:
if recv.notify != self._put: if recv.notify != self._put:
raise ValueError raise ValueError
@ -104,16 +203,59 @@ class Select(object):
raise Error(self.not_present_msg) raise Error(self.not_present_msg)
def close(self): def close(self):
"""
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :meth:`get` to be woken with
:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
"""
for recv in self._receivers[:]: for recv in self._receivers[:]:
self.remove(recv) self.remove(recv)
self._latch.close() self._latch.close()
def empty(self): def empty(self):
"""
Return :data:`True` if calling :meth:`get` would block.
As with :class:`Queue.Queue`, :data:`True` may be returned even though
a subsequent call to :meth:`get` will succeed, since a message may be
posted at any moment between :meth:`empty` and :meth:`get`.
:meth:`empty` may return :data:`False` even when :meth:`get` would
block if another thread has drained a receiver added to this select.
This can be avoided by only consuming each receiver from a single
thread.
"""
return self._latch.empty() return self._latch.empty()
empty_msg = 'Cannot get(), Select instance is empty' empty_msg = 'Cannot get(), Select instance is empty'
def get(self, timeout=None, block=True): def get(self, timeout=None, block=True):
"""
Fetch the next available value from any receiver, or raise
:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :data:`False`, immediately raise
:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:meth:`close` has been called, and the underlying latch is no
longer valid.
"""
if not self._receivers: if not self._receivers:
raise Error(self.empty_msg) raise Error(self.empty_msg)

@ -395,11 +395,13 @@ class Service(object):
Called when a message arrives on any of :attr:`select`'s registered Called when a message arrives on any of :attr:`select`'s registered
receivers. receivers.
""" """
pass
def on_shutdown(self): def on_shutdown(self):
""" """
Called by Pool.shutdown() once the last worker thread has exitted. Called by Pool.shutdown() once the last worker thread has exitted.
""" """
pass
class Pool(object): class Pool(object):

@ -46,6 +46,11 @@ else:
def disable_site_packages(): def disable_site_packages():
"""
Remove all entries mentioning ``site-packages`` or ``Extras`` from
:attr:sys.path. Used primarily for testing on OS X within a virtualenv,
where OS X bundles some ancient version of the :mod:`six` module.
"""
for entry in sys.path[:]: for entry in sys.path[:]:
if 'site-packages' in entry or 'Extras' in entry: if 'site-packages' in entry or 'Extras' in entry:
sys.path.remove(entry) sys.path.remove(entry)
@ -65,6 +70,26 @@ def log_get_formatter():
def log_to_file(path=None, io=False, level='INFO'): def log_to_file(path=None, io=False, level='INFO'):
"""
Install a new :class:`logging.Handler` writing applications logs to the
filesystem. Useful when debugging slave IO problems.
Parameters to this function may be overridden at runtime using environment
variables. See :ref:`logging-env-vars`.
:param str path:
If not :data:`None`, a filesystem path to write logs to. Otherwise,
logs are written to :data:`sys.stderr`.
:param bool io:
If :data:`True`, include extremely verbose IO logs in the output.
Useful for debugging hangs, less useful for debugging application code.
:param str level:
Name of the :mod:`logging` package constant that is the minimum level
to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``, and
``ERROR``.
"""
log = logging.getLogger('') log = logging.getLogger('')
if path: if path:
fp = open(path, 'w', 1) fp = open(path, 'w', 1)
@ -94,6 +119,14 @@ def log_to_file(path=None, io=False, level='INFO'):
def run_with_router(func, *args, **kwargs): def run_with_router(func, *args, **kwargs):
"""
Arrange for `func(router, \*args, \**kwargs)` to run with a temporary
:class:`mitogen.master.Router`, ensuring the Router and Broker are
correctly shut down during normal or exceptional return.
:returns:
`func`'s return value.
"""
broker = mitogen.master.Broker() broker = mitogen.master.Broker()
router = mitogen.master.Router(broker) router = mitogen.master.Router(broker)
try: try:
@ -104,6 +137,17 @@ def run_with_router(func, *args, **kwargs):
def with_router(func): def with_router(func):
"""
Decorator version of :func:`run_with_router`. Example:
.. code-block:: python
@with_router
def do_stuff(router, arg):
pass
do_stuff(blah, 123)
"""
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
return run_with_router(func, *args, **kwargs) return run_with_router(func, *args, **kwargs)
if mitogen.core.PY3: if mitogen.core.PY3:
@ -122,7 +166,27 @@ PASSTHROUGH = (
mitogen.core.Secret, mitogen.core.Secret,
) )
def cast(obj): def cast(obj):
"""
Many tools love to subclass built-in types in order to implement useful
functionality, such as annotating the safety of a Unicode string, or adding
additional methods to a dict. However, cPickle loves to preserve those
subtypes during serialization, resulting in CallError during :meth:`call
<mitogen.parent.Context.call>` in the target when it tries to deserialize
the data.
This function walks the object graph `obj`, producing a copy with any
custom sub-types removed. The functionality is not default since the
resulting walk may be computationally expensive given a large enough graph.
See :ref:`serialization-rules` for a list of supported types.
:param obj:
Object to undecorate.
:returns:
Undecorated object.
"""
if isinstance(obj, dict): if isinstance(obj, dict):
return dict((cast(k), cast(v)) for k, v in iteritems(obj)) return dict((cast(k), cast(v)) for k, v in iteritems(obj))
if isinstance(obj, (list, tuple)): if isinstance(obj, (list, tuple)):

@ -1,6 +1,7 @@
- import_playbook: kubectl.yml - import_playbook: kubectl.yml
- import_playbook: lxc.yml - import_playbook: lxc.yml
- import_playbook: lxd.yml - import_playbook: lxd.yml
- import_playbook: mitogen_doas.yml
- import_playbook: mitogen_sudo.yml
- import_playbook: setns_lxc.yml - import_playbook: setns_lxc.yml
- import_playbook: setns_lxd.yml - import_playbook: setns_lxd.yml
- import_playbook: sudo.yml

@ -0,0 +1,21 @@
- name: integration/stub_connections/mitogen_doas.yml
hosts: test-targets
gather_facts: false
any_errors_fatal: true
tasks:
- meta: end_play
when: not is_mitogen
- custom_python_detect_environment:
vars:
ansible_connection: mitogen_doas
ansible_become_exe: stub-doas.py
ansible_user: someuser
register: out
- debug: var=out.env.ORIGINAL_ARGV
- assert:
that:
- out.env.THIS_IS_STUB_DOAS == '1'
- (out.env.ORIGINAL_ARGV|from_json)[1:3] == ['-u', 'someuser']

@ -1,5 +1,5 @@
- name: integration/stub_connections/sudo.yml - name: integration/stub_connections/mitogen_sudo.yml
hosts: test-targets hosts: test-targets
gather_facts: false gather_facts: false
any_errors_fatal: true any_errors_fatal: true

@ -0,0 +1,9 @@
#!/usr/bin/env python
import json
import os
import sys
os.environ['ORIGINAL_ARGV'] = json.dumps(sys.argv)
os.environ['THIS_IS_STUB_DOAS'] = '1'
os.execv(sys.executable, sys.argv[sys.argv.index('--') + 1:])

@ -0,0 +1,31 @@
import os
import mitogen
import mitogen.parent
import unittest2
import testlib
class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
doas_path = testlib.data_path('stubs/stub-doas.py')
def test_okay(self):
context = self.router.doas(
doas_path=self.doas_path,
username='someuser',
)
argv = eval(context.call(os.getenv, 'ORIGINAL_ARGV'))
self.assertEquals(argv[:4], [
self.doas_path,
'-u',
'someuser',
'--',
])
self.assertEquals('1', context.call(os.getenv, 'THIS_IS_STUB_DOAS'))
if __name__ == '__main__':
unittest2.main()

@ -0,0 +1,106 @@
import codecs
import glob
import pprint
import unittest2
import mitogen.minify
import testlib
def read_sample(fname):
sample_path = testlib.data_path('minimize_samples/' + fname)
sample_file = open(sample_path)
sample = sample_file.read()
sample_file.close()
return sample
class MinimizeSourceTest(unittest2.TestCase):
func = staticmethod(mitogen.minify.minimize_source)
def test_class(self):
original = read_sample('class.py')
expected = read_sample('class_min.py')
self.assertEqual(expected, self.func(original))
def test_comment(self):
original = read_sample('comment.py')
expected = read_sample('comment_min.py')
self.assertEqual(expected, self.func(original))
def test_def(self):
original = read_sample('def.py')
expected = read_sample('def_min.py')
self.assertEqual(expected, self.func(original))
def test_hashbang(self):
original = read_sample('hashbang.py')
expected = read_sample('hashbang_min.py')
self.assertEqual(expected, self.func(original))
def test_mod(self):
original = read_sample('mod.py')
expected = read_sample('mod_min.py')
self.assertEqual(expected, self.func(original))
def test_pass(self):
original = read_sample('pass.py')
expected = read_sample('pass_min.py')
self.assertEqual(expected, self.func(original))
def test_obstacle_course(self):
original = read_sample('obstacle_course.py')
expected = read_sample('obstacle_course_min.py')
self.assertEqual(expected, self.func(original))
class MitogenCoreTest(unittest2.TestCase):
# Verify minimize_source() succeeds for all built-in modules.
func = staticmethod(mitogen.minify.minimize_source)
def read_source(self, name):
fp = codecs.open(name, encoding='utf-8')
try:
return fp.read()
finally:
fp.close()
def _test_syntax_valid(self, minified, name):
compile(minified, name, 'exec')
def _test_line_counts_match(self, original, minified):
self.assertEquals(original.count('\n'),
minified.count('\n'))
def _test_non_blank_lines_match(self, name, original, minified):
# Verify first token matches. We just want to ensure line numbers make
# sense, this is good enough.
olines = original.splitlines()
mlines = minified.splitlines()
for i, (orig, mini) in enumerate(zip(olines, mlines)):
if i < 2:
assert orig == mini
continue
owords = orig.split()
mwords = mini.split()
assert len(mwords) == 0 or (mwords[0] == owords[0]), pprint.pformat({
'line': i+1,
'filename': name,
'owords': owords,
'mwords': mwords,
})
def test_minify_all(self):
for name in glob.glob('mitogen/*.py'):
original = self.read_source(name)
minified = self.func(original)
self._test_syntax_valid(minified, name)
self._test_line_counts_match(original, minified)
self._test_non_blank_lines_match(name, original, minified)
if __name__ == '__main__':
unittest2.main()

@ -1,55 +0,0 @@
import unittest2
import mitogen.minify
import testlib
def read_sample(fname):
sample_path = testlib.data_path('minimize_samples/' + fname)
sample_file = open(sample_path)
sample = sample_file.read()
sample_file.close()
return sample
class MinimizeSource(unittest2.TestCase):
func = staticmethod(mitogen.minify.minimize_source)
def test_class(self):
original = read_sample('class.py')
expected = read_sample('class_min.py')
self.assertEqual(expected, self.func(original))
def test_comment(self):
original = read_sample('comment.py')
expected = read_sample('comment_min.py')
self.assertEqual(expected, self.func(original))
def test_def(self):
original = read_sample('def.py')
expected = read_sample('def_min.py')
self.assertEqual(expected, self.func(original))
def test_hashbang(self):
original = read_sample('hashbang.py')
expected = read_sample('hashbang_min.py')
self.assertEqual(expected, self.func(original))
def test_mod(self):
original = read_sample('mod.py')
expected = read_sample('mod_min.py')
self.assertEqual(expected, self.func(original))
def test_pass(self):
original = read_sample('pass.py')
expected = read_sample('pass_min.py')
self.assertEqual(expected, self.func(original))
def test_obstacle_course(self):
original = read_sample('obstacle_course.py')
expected = read_sample('obstacle_course_min.py')
self.assertEqual(expected, self.func(original))
if __name__ == '__main__':
unittest2.main()
Loading…
Cancel
Save