diff --git a/docs/api.rst b/docs/api.rst index ea20ada7..844bb900 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -84,237 +84,16 @@ Message Class ============= .. currentmodule:: mitogen.core - -.. class:: Message - - Messages are the fundamental unit of communication, comprising fields from - the :ref:`stream-protocol` header, an optional reference to the receiving - :class:`mitogen.core.Router` for ingress messages, and helper methods for - deserialization and generating replies. - - .. attribute:: router - - The :class:`mitogen.core.Router` responsible for routing the - message. This is :data:`None` for locally originated messages. - - .. attribute:: receiver - - The :class:`mitogen.core.Receiver` over which the message was last - received. Part of the :class:`mitogen.select.Select` interface. - Defaults to :data:`None`. - - .. attribute:: dst_id - - Integer target context ID. :class:`mitogen.core.Router` delivers - messages locally when their :attr:`dst_id` matches - :data:`mitogen.context_id`, otherwise they are routed up or downstream. - - .. attribute:: src_id - - Integer source context ID. Used as the target of replies if any are - generated. - - .. attribute:: auth_id - - The context ID under whose authority the message is acting. See - :ref:`source-verification`. - - .. attribute:: handle - - Integer target handle in the destination context. This is one of the - :ref:`standard-handles`, or a dynamically generated handle used to - receive a one-time reply, such as the return value of a function call. - - .. attribute:: reply_to - - Integer target handle to direct any reply to this message. Used to - receive a one-time reply, such as the return value of a function call. - :data:`IS_DEAD` has a special meaning when it appears in this field. - - .. attribute:: data - - Message data, which may be raw or pickled. - - .. attribute:: is_dead - - :data:`True` if :attr:`reply_to` is set to the magic value - :data:`mitogen.core.IS_DEAD`, indicating the sender considers the - channel dead. - - .. py:method:: __init__ (\**kwargs) - - Construct a message from from the supplied `kwargs`. :attr:`src_id` - and :attr:`auth_id` are always set to :data:`mitogen.context_id`. - - .. py:classmethod:: pickled (obj, \**kwargs) - - Construct a pickled message, setting :attr:`data` to the - serialization of `obj`, and setting remaining fields using `kwargs`. - - :returns: - The new message. - - .. method:: unpickle (throw=True) - - Unpickle :attr:`data`, optionally raising any exceptions present. - - :param bool throw: - If :data:`True`, raise exceptions, otherwise it is the caller's - responsibility. - - :raises mitogen.core.CallError: - The serialized data contained CallError exception. - :raises mitogen.core.ChannelError: - The `is_dead` field was set. - - .. method:: reply (obj, router=None, \**kwargs) - - Compose a reply to this message and send it using :attr:`router`, or - `router` is :attr:`router` is :data:`None`. - - :param obj: - Either a :class:`Message`, or an object to be serialized in order - to construct a new message. - :param router: - Optional router to use if :attr:`router` is :data:`None`. - :param kwargs: - Optional keyword parameters overriding message fields in the reply. - +.. autoclass:: Message + :members: Router Class ============ .. currentmodule:: mitogen.core - -.. class:: Router - - Route messages between parent and child contexts, and invoke handlers - defined on our parent context. :meth:`Router.route() ` straddles - the :class:`Broker ` and user threads, it is safe - to call anywhere. - - **Note:** This is the somewhat limited core version of the Router class - used by child contexts. The master subclass is documented below this one. - - .. attribute:: unidirectional - - When :data:`True`, permit children to only communicate with the current - context or a parent of the current context. Routing between siblings or - children of parents is prohibited, ensuring no communication is - possible between intentionally partitioned networks, such as when a - program simultaneously manipulates hosts spread across a corporate and - a production network, or production networks that are otherwise - air-gapped. - - Sending a prohibited message causes an error to be logged and a dead - message to be sent in reply to the errant message, if that message has - ``reply_to`` set. - - The value of :data:`unidirectional` becomes the default for the - :meth:`local() ` `unidirectional` - parameter. - - .. method:: stream_by_id (dst_id) - - Return the :class:`mitogen.core.Stream` that should be used to - communicate with `dst_id`. If a specific route for `dst_id` is not - known, a reference to the parent context's stream is returned. - - .. method:: add_route (target_id, via_id) - - Arrange for messages whose `dst_id` is `target_id` to be forwarded on - the directly connected stream for `via_id`. This method is called - automatically in response to ``ADD_ROUTE`` messages, but remains public - for now while the design has not yet settled, and situations may arise - where routing is not fully automatic. - - .. method:: register (context, stream) - - Register a new context and its associated stream, and add the stream's - receive side to the I/O multiplexer. This This method remains public - for now while hte design has not yet settled. - - .. method:: add_handler (fn, handle=None, persist=True, respondent=None, policy=None) - - Invoke `fn(msg)` for each Message sent to `handle` from this context. - Unregister after one invocation if `persist` is :data:`False`. If - `handle` is :data:`None`, a new handle is allocated and returned. - - :param int handle: - If not :data:`None`, an explicit handle to register, usually one of - the ``mitogen.core.*`` constants. If unspecified, a new unused - handle will be allocated. - - :param bool persist: - If :data:`False`, the handler will be unregistered after a single - message has been received. - - :param mitogen.core.Context respondent: - Context that messages to this handle are expected to be sent from. - If specified, arranges for a dead message to be delivered to `fn` - when disconnection of the context is detected. - - In future `respondent` will likely also be used to prevent other - contexts from sending messages to the handle. - - :param function policy: - Function invoked as `policy(msg, stream)` where `msg` is a - :class:`mitogen.core.Message` about to be delivered, and - `stream` is the :class:`mitogen.core.Stream` on which it was - received. The function must return :data:`True`, otherwise an - error is logged and delivery is refused. - - Two built-in policy functions exist: - - * :func:`mitogen.core.has_parent_authority`: requires the - message arrived from a parent context, or a context acting with a - parent context's authority (``auth_id``). - - * :func:`mitogen.parent.is_immediate_child`: requires the - message arrived from an immediately connected child, for use in - messaging patterns where either something becomes buggy or - insecure by permitting indirect upstream communication. - - In case of refusal, and the message's ``reply_to`` field is - nonzero, a :class:`mitogen.core.CallError` is delivered to the - sender indicating refusal occurred. - - :return: - `handle`, or if `handle` was :data:`None`, the newly allocated - handle. - - .. method:: del_handler (handle) - - Remove the handle registered for `handle` - - :raises KeyError: - The handle wasn't registered. - - .. method:: _async_route(msg, stream=None) - - Arrange for `msg` to be forwarded towards its destination. If its - destination is the local context, then arrange for it to be dispatched - using the local handlers. - - This is a lower overhead version of :meth:`route` that may only be - called from the I/O multiplexer thread. - - :param mitogen.core.Stream stream: - If not :data:`None`, a reference to the stream the message arrived - on. Used for performing source route verification, to ensure - sensitive messages such as ``CALL_FUNCTION`` arrive only from - trusted contexts. - - .. method:: route(msg) - - Arrange for the :class:`Message` `msg` to be delivered to its - destination using any relevant downstream context, or if none is found, - by forwarding the message upstream towards the master context. If `msg` - is destined for the local context, it is dispatched using the handles - registered with :meth:`add_handler`. - - This may be called from any thread. +.. autoclass:: Router + :members: .. currentmodule:: mitogen.master @@ -362,11 +141,6 @@ Router Class ``ALLOCATE_ID`` message that causes the master to emit matching ``ADD_ROUTE`` messages prior to replying. - .. method:: context_by_id (context_id, via_id=None) - - Messy factory/lookup function to find a context by its ID, or construct - it. In future this will be replaced by a much more sensible interface. - .. _context-factories: **Context Factories** @@ -803,53 +577,8 @@ Context Class ============= .. currentmodule:: mitogen.core - -.. class:: Context - - Represent a remote context regardless of connection method. - - **Note:** This is the somewhat limited core version of the Context class - used by child contexts. The master subclass is documented below this one. - - .. method:: send (msg) - - Arrange for `msg` to be delivered to this context. - :attr:`dst_id ` is set to the target context ID. - - :param mitogen.core.Message msg: - The message. - - .. method:: send_async (msg, persist=False) - - Arrange for `msg` to be delivered to this context, with replies - directed to a newly constructed receiver. :attr:`dst_id - ` is set to the target context ID, and :attr:`reply_to - ` is set to the newly constructed receiver's handle. - - :param bool persist: - If :data:`False`, the handler will be unregistered after a single - message has been received. - - :param mitogen.core.Message msg: - The message. - - :returns: - :class:`mitogen.core.Receiver` configured to receive any replies - sent to the message's `reply_to` handle. - - .. method:: send_await (msg, deadline=None) - - Like :meth:`send_async`, but expect a single reply (`persist=False`) - delivered within `deadline` seconds. - - :param mitogen.core.Message msg: - The message. - :param float deadline: - If not :data:`None`, seconds before timing out waiting for a reply. - :returns: - The deserialized reply. - :raises mitogen.core.TimeoutError: - No message was received and `deadline` passed. +.. autoclass:: Context + :members: .. currentmodule:: mitogen.parent @@ -857,340 +586,33 @@ Context Class .. autoclass:: CallChain :members: -.. class:: Context - - Extend :class:`mitogen.core.Context` with functionality useful to masters, - and child contexts who later become parents. Currently when this class is - required, the target context's router is upgraded at runtime. - - .. attribute:: default_call_chain - - A :class:`CallChain` instance constructed by default, with pipelining - disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` - use this instance. - - .. method:: shutdown (wait=False) - - Arrange for the context to receive a ``SHUTDOWN`` message, triggering - graceful shutdown. - - Due to a lack of support for timers, no attempt is made yet to force - terminate a hung context using this method. This will be fixed shortly. - - :param bool wait: - If :data:`True`, block the calling thread until the context has - completely terminated. - :returns: - If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch` - whose :meth:`get() ` method returns - :data:`None` when shutdown completes. The `timeout` parameter may - be used to implement graceful timeouts. - - .. method:: call_async (fn, \*args, \*\*kwargs) - - See :meth:`CallChain.call_async`. - - .. method:: call (fn, \*args, \*\*kwargs) - - See :meth:`CallChain.call`. - - .. method:: call_no_reply (fn, \*args, \*\*kwargs) - - See :meth:`CallChain.call_no_reply`. +.. autoclass:: Context + :members: Receiver Class ============== .. currentmodule:: mitogen.core - -.. class:: Receiver (router, handle=None, persist=True, respondent=None) - - Receivers are used to wait for pickled responses from another context to be - sent to a handle registered in this context. A receiver may be single-use - (as in the case of :meth:`mitogen.parent.Context.call_async`) or - multiple use. - - :param mitogen.core.Router router: - Router to register the handler on. - - :param int handle: - If not :data:`None`, an explicit handle to register, otherwise an - unused handle is chosen. - - :param bool persist: - If :data:`True`, do not unregister the receiver's handler after the - first message. - - :param mitogen.core.Context respondent: - Reference to the context this receiver is receiving from. If not - :data:`None`, arranges for the receiver to receive a dead message if - messages can no longer be routed to the context, due to disconnection - or exit. - - .. attribute:: notify = None - - If not :data:`None`, a reference to a function invoked as - `notify(receiver)` when a new message is delivered to this receiver. - Used by :class:`mitogen.select.Select` to implement waiting on - multiple receivers. - - .. py:method:: to_sender () - - Return a :class:`mitogen.core.Sender` configured to deliver messages - to this receiver. Since a Sender can be serialized, this makes it - convenient to pass `(context_id, handle)` pairs around:: - - def deliver_monthly_report(sender): - for line in open('monthly_report.txt'): - sender.send(line) - sender.close() - - remote = router.ssh(hostname='mainframe') - recv = mitogen.core.Receiver(router) - remote.call(deliver_monthly_report, recv.to_sender()) - for msg in recv: - print(msg) - - .. py:method:: empty () - - Return :data:`True` if calling :meth:`get` would block. - - As with :class:`Queue.Queue`, :data:`True` may be returned even - though a subsequent call to :meth:`get` will succeed, since a - message may be posted at any moment between :meth:`empty` and - :meth:`get`. - - :meth:`empty` is only useful to avoid a race while installing - :attr:`notify`: - - .. code-block:: python - - recv.notify = _my_notify_function - if not recv.empty(): - _my_notify_function(recv) - - # It is guaranteed the receiver was empty after the notification - # function was installed, or that it was non-empty and the - # notification function was invoked at least once. - - .. py:method:: close () - - Cause :class:`mitogen.core.ChannelError` to be raised in any thread - waiting in :meth:`get` on this receiver. - - .. py:method:: get (timeout=None) - - Sleep waiting for a message to arrive on this receiver. - - :param float timeout: - If not :data:`None`, specifies a timeout in seconds. - - :raises mitogen.core.ChannelError: - The remote end indicated the channel should be closed, or - communication with its parent context was lost. - - :raises mitogen.core.TimeoutError: - Timeout was reached. - - :returns: - `(msg, data)` tuple, where `msg` is the - :class:`mitogen.core.Message` that was received, and `data` is - its unpickled data part. - - .. py:method:: get_data (timeout=None) - - Like :meth:`get`, except only return the data part. - - .. py:method:: __iter__ () - - Block and yield `(msg, data)` pairs delivered to this receiver until - :class:`mitogen.core.ChannelError` is raised. +.. autoclass:: Receiver + :members: Sender Class ============ .. currentmodule:: mitogen.core - -.. class:: Sender (context, dst_handle) - - Senders are used to send pickled messages to a handle in another context, - it is the inverse of :class:`mitogen.core.Sender`. - - Senders may be serialized, making them convenient to wire up data flows. - See :meth:`mitogen.core.Receiver.to_sender` for more information. - - :param mitogen.core.Context context: - Context to send messages to. - :param int dst_handle: - Destination handle to send messages to. - - .. py:method:: close () - - Send a dead message to the remote end, causing :meth:`ChannelError` - to be raised in any waiting thread. - - .. py:method:: send (data) - - Send `data` to the remote end. +.. autoclass:: Sender + :members: Select Class ============ .. module:: mitogen.select - .. currentmodule:: mitogen.select - -.. class:: Select (receivers=(), oneshot=True) - - Support scatter/gather asynchronous calls and waiting on multiple - receivers, channels, and sub-Selects. Accepts a sequence of - :class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` - instances and returns the first value posted to any receiver or select. - - If `oneshot` is :data:`True`, then remove each receiver as it yields a - result; since :meth:`__iter__` terminates once the final receiver is - removed, this makes it convenient to respond to calls made in parallel: - - .. code-block:: python - - total = 0 - recvs = [c.call_async(long_running_operation) for c in contexts] - - for msg in mitogen.select.Select(recvs): - print('Got %s from %s' % (msg, msg.receiver)) - total += msg.unpickle() - - # Iteration ends when last Receiver yields a result. - print('Received total %s from %s receivers' % (total, len(recvs))) - - :class:`Select` may drive a long-running scheduler: - - .. code-block:: python - - with mitogen.select.Select(oneshot=False) as select: - while running(): - for msg in select: - process_result(msg.receiver.context, msg.unpickle()) - for context, workfunc in get_new_work(): - select.add(context.call_async(workfunc)) - - :class:`Select` may be nested: - - .. code-block:: python - - subselects = [ - mitogen.select.Select(get_some_work()), - mitogen.select.Select(get_some_work()), - mitogen.select.Select([ - mitogen.select.Select(get_some_work()), - mitogen.select.Select(get_some_work()) - ]) - ] - - for msg in mitogen.select.Select(selects): - print(msg.unpickle()) - - .. py:classmethod:: all (it) - - Take an iterable of receivers and retrieve a :class:`Message` from - each, returning the result of calling `msg.unpickle()` on each in turn. - Results are returned in the order they arrived. - - This is sugar for handling batch - :meth:`Context.call_async ` - invocations: - - .. code-block:: python - - print('Total disk usage: %.02fMiB' % (sum( - mitogen.select.Select.all( - context.call_async(get_disk_usage) - for context in contexts - ) / 1048576.0 - ),)) - - However, unlike in a naive comprehension such as: - - .. code-block:: python - - recvs = [c.call_async(get_disk_usage) for c in contexts] - sum(recv.get().unpickle() for recv in recvs) - - Result processing happens in the order results arrive, rather than the - order requests were issued, so :meth:`all` should always be faster. - - .. py:method:: get (timeout=None, block=True) - - Fetch the next available value from any receiver, or raise - :class:`mitogen.core.TimeoutError` if no value is available within - `timeout` seconds. - - On success, the message's :attr:`receiver - ` attribute is set to the receiver. - - :param float timeout: - Timeout in seconds. - :param bool block: - If :data:`False`, immediately raise - :class:`mitogen.core.TimeoutError` if the select is empty. - :return: - :class:`mitogen.core.Message` - :raises mitogen.core.TimeoutError: - Timeout was reached. - :raises mitogen.core.LatchError: - :meth:`close` has been called, and the underlying latch is no - longer valid. - - .. py:method:: __bool__ () - - Return :data:`True` if any receivers are registered with this select. - - .. py:method:: close () - - Remove the select's notifier function from each registered receiver, - mark the associated latch as closed, and cause any thread currently - sleeping in :meth:`get` to be woken with - :class:`mitogen.core.LatchError`. - - This is necessary to prevent memory leaks in long-running receivers. It - is called automatically when the Python :keyword:`with` statement is - used. - - .. py:method:: empty () - - Return :data:`True` if calling :meth:`get` would block. - - As with :class:`Queue.Queue`, :data:`True` may be returned even - though a subsequent call to :meth:`get` will succeed, since a - message may be posted at any moment between :meth:`empty` and - :meth:`get`. - - :meth:`empty` may return :data:`False` even when :meth:`get` - would block if another thread has drained a receiver added to this - select. This can be avoided by only consuming each receiver from a - single thread. - - .. py:method:: __iter__ (self) - - Yield the result of :meth:`get` until no receivers remain in the - select, either because `oneshot` is :data:`True`, or each receiver was - explicitly removed via :meth:`remove`. - - .. py:method:: add (recv) - - Add the :class:`mitogen.core.Receiver` or - :class:`mitogen.core.Channel` `recv` to the select. - - .. py:method:: remove (recv) - - Remove the :class:`mitogen.core.Receiver` or - :class:`mitogen.core.Channel` `recv` from the select. Note that if - the receiver has notified prior to :meth:`remove`, then it will - still be returned by a subsequent :meth:`get`. This may change in a - future version. +.. autoclass:: Select + :members: Channel Class @@ -1327,64 +749,13 @@ Utility Functions A random assortment of utility functions useful on masters and children. .. currentmodule:: mitogen.utils -.. function:: cast (obj) - - Many tools love to subclass built-in types in order to implement useful - functionality, such as annotating the safety of a Unicode string, or adding - additional methods to a dict. However, cPickle loves to preserve those - subtypes during serialization, resulting in CallError during :meth:`call - ` in the target when it tries to deserialize - the data. - - This function walks the object graph `obj`, producing a copy with any - custom sub-types removed. The functionality is not default since the - resulting walk may be computationally expensive given a large enough graph. - - See :ref:`serialization-rules` for a list of supported types. - - :param obj: - Object to undecorate. - :returns: - Undecorated object. - -.. currentmodule:: mitogen.utils -.. function:: disable_site_packages +.. autofunction:: cast - Remove all entries mentioning ``site-packages`` or ``Extras`` from the - system path. Used primarily for testing on OS X within a virtualenv, where - OS X bundles some ancient version of the :mod:`six` module. .. currentmodule:: mitogen.utils -.. function:: log_to_file (path=None, io=False, level='INFO') - - Install a new :class:`logging.Handler` writing applications logs to the - filesystem. Useful when debugging slave IO problems. - - Parameters to this function may be overridden at runtime using environment - variables. See :ref:`logging-env-vars`. - - :param str path: - If not :data:`None`, a filesystem path to write logs to. Otherwise, - logs are written to :data:`sys.stderr`. - - :param bool io: - If :data:`True`, include extremely verbose IO logs in the output. - Useful for debugging hangs, less useful for debugging application code. - - :param str level: - Name of the :mod:`logging` package constant that is the minimum - level to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``, - and ``ERROR``. - -.. currentmodule:: mitogen.utils -.. function:: run_with_router(func, \*args, \**kwargs) - - Arrange for `func(router, \*args, \**kwargs)` to run with a temporary - :class:`mitogen.master.Router`, ensuring the Router and Broker are - correctly shut down during normal or exceptional return. - - :returns: - `func`'s return value. +.. autofunction:: disable_site_packages +.. autofunction:: log_to_file +.. autofunction:: run_with_router(func, \*args, \**kwargs) .. currentmodule:: mitogen.utils .. decorator:: with_router diff --git a/mitogen/core.py b/mitogen/core.py index d3909773..f2dece48 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -500,18 +500,53 @@ else: class Message(object): + """ + Messages are the fundamental unit of communication, comprising fields from + the :ref:`stream-protocol` header, an optional reference to the receiving + :class:`mitogen.core.Router` for ingress messages, and helper methods for + deserialization and generating replies. + """ + #: Integer target context ID. :class:`Router` delivers messages locally + #: when their :attr:`dst_id` matches :data:`mitogen.context_id`, otherwise + #: they are routed up or downstream. dst_id = None + + #: Integer source context ID. Used as the target of replies if any are + #: generated. src_id = None + + #: Context ID under whose authority the message is acting. See + #: :ref:`source-verification`. auth_id = None + + #: Integer target handle in the destination context. This is one of the + #: :ref:`standard-handles`, or a dynamically generated handle used to + #: receive a one-time reply, such as the return value of a function call. handle = None + + #: Integer target handle to direct any reply to this message. Used to + #: receive a one-time reply, such as the return value of a function call. + #: :data:`IS_DEAD` has a special meaning when it appears in this field. reply_to = None + + #: Raw message data bytes. data = b('') + _unpickled = object() + #: The :class:`Router` responsible for routing the message. This is + #: :data:`None` for locally originated messages. router = None + + #: The :class:`Receiver` over which the message was last received. Part of + #: the :class:`mitogen.select.Select` interface. Defaults to :data:`None`. receiver = None def __init__(self, **kwargs): + """ + Construct a message from from the supplied `kwargs`. :attr:`src_id` and + :attr:`auth_id` are always set to :data:`mitogen.context_id`. + """ self.src_id = mitogen.context_id self.auth_id = mitogen.context_id vars(self).update(kwargs) @@ -551,14 +586,28 @@ class Message(object): @property def is_dead(self): + """ + :data:`True` if :attr:`reply_to` is set to the magic value + :data:`IS_DEAD`, indicating the sender considers the channel dead. + """ return self.reply_to == IS_DEAD @classmethod def dead(cls, **kwargs): + """ + Syntax helper to construct a dead message. + """ return cls(reply_to=IS_DEAD, **kwargs) @classmethod def pickled(cls, obj, **kwargs): + """ + Construct a pickled message, setting :attr:`data` to the serialization + of `obj`, and setting remaining fields using `kwargs`. + + :returns: + The new message. + """ self = cls(**kwargs) try: self.data = pickle.dumps(obj, protocol=2) @@ -568,6 +617,18 @@ class Message(object): return self def reply(self, msg, router=None, **kwargs): + """ + Compose a reply to this message and send it using :attr:`router`, or + `router` is :attr:`router` is :data:`None`. + + :param obj: + Either a :class:`Message`, or an object to be serialized in order + to construct a new message. + :param router: + Optional router to use if :attr:`router` is :data:`None`. + :param kwargs: + Optional keyword parameters overriding message fields in the reply. + """ if not isinstance(msg, Message): msg = Message.pickled(msg) msg.dst_id = self.src_id @@ -584,7 +645,18 @@ class Message(object): UNPICKLER_KWARGS = {} def unpickle(self, throw=True, throw_dead=True): - """Deserialize `data` into an object.""" + """ + Unpickle :attr:`data`, optionally raising any exceptions present. + + :param bool throw_dead: + If :data:`True`, raise exceptions, otherwise it is the caller's + responsibility. + + :raises CallError: + The serialized data contained CallError exception. + :raises ChannelError: + The `is_dead` field was set. + """ _vv and IOLOG.debug('%r.unpickle()', self) if throw_dead and self.is_dead: raise ChannelError(ChannelError.remote_msg) @@ -616,25 +688,42 @@ class Message(object): class Sender(object): + """ + Senders are used to send pickled messages to a handle in another context, + it is the inverse of :class:`mitogen.core.Sender`. + + Senders may be serialized, making them convenient to wire up data flows. + See :meth:`mitogen.core.Receiver.to_sender` for more information. + + :param Context context: + Context to send messages to. + :param int dst_handle: + Destination handle to send messages to. + """ def __init__(self, context, dst_handle): self.context = context self.dst_handle = dst_handle - def __repr__(self): - return 'Sender(%r, %r)' % (self.context, self.dst_handle) - - def __reduce__(self): - return _unpickle_sender, (self.context.context_id, self.dst_handle) + def send(self, data): + """ + Send `data` to the remote end. + """ + _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) + self.context.send(Message.pickled(data, handle=self.dst_handle)) def close(self): - """Indicate this channel is closed to the remote side.""" + """ + Send a dead message to the remote, causing :meth:`ChannelError` to be + raised in any waiting thread. + """ _vv and IOLOG.debug('%r.close()', self) self.context.send(Message.dead(handle=self.dst_handle)) - def send(self, data): - """Send `data` to the remote.""" - _vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100]) - self.context.send(Message.pickled(data, handle=self.dst_handle)) + def __repr__(self): + return 'Sender(%r, %r)' % (self.context, self.dst_handle) + + def __reduce__(self): + return _unpickle_sender, (self.context.context_id, self.dst_handle) def _unpickle_sender(router, context_id, dst_handle): @@ -646,12 +735,39 @@ def _unpickle_sender(router, context_id, dst_handle): class Receiver(object): + """ + Receivers maintain a thread-safe queue of messages sent to a handle of this + context from another context. + + :param mitogen.core.Router router: + Router to register the handler on. + + :param int handle: + If not :data:`None`, an explicit handle to register, otherwise an + unused handle is chosen. + + :param bool persist: + If :data:`False`, unregister the handler after one message is received. + Single-message receivers are intended for RPC-like transactions, such + as in the case of :meth:`mitogen.parent.Context.call_async`. + + :param mitogen.core.Context respondent: + Context this receiver is receiving from. If not :data:`None`, arranges + for the receiver to receive a dead message if messages can no longer be + routed to the context, due to disconnection or exit. + """ + #: If not :data:`None`, a reference to a function invoked as + #: `notify(receiver)` when a new message is delivered to this receiver. + #: Used by :class:`mitogen.select.Select` to implement waiting on multiple + #: receivers. notify = None + raise_channelerror = True def __init__(self, router, handle=None, persist=True, respondent=None, policy=None): self.router = router + #: The handle. self.handle = handle # Avoid __repr__ crash in add_handler() self._latch = Latch() # Must exist prior to .add_handler() self.handle = router.add_handler( @@ -666,26 +782,73 @@ class Receiver(object): return 'Receiver(%r, %r)' % (self.router, self.handle) def to_sender(self): + """ + Return a :class:`Sender` configured to deliver messages to this + receiver. As senders are serializable, this makes it convenient to pass + `(context_id, handle)` pairs around:: + + def deliver_monthly_report(sender): + for line in open('monthly_report.txt'): + sender.send(line) + sender.close() + + remote = router.ssh(hostname='mainframe') + recv = mitogen.core.Receiver(router) + remote.call(deliver_monthly_report, recv.to_sender()) + for msg in recv: + print(msg) + """ context = Context(self.router, mitogen.context_id) return Sender(context, self.handle) def _on_receive(self, msg): - """Callback from the Stream; appends data to the internal queue.""" + """ + Callback from the Stream; appends data to the internal queue. + """ _vv and IOLOG.debug('%r._on_receive(%r)', self, msg) self._latch.put(msg) if self.notify: self.notify(self) def close(self): + """ + Unregister the receiver's handle from its associated router, and cause + :class:`ChannelError` to be raised in any thread waiting in :meth:`get` + on this receiver. + """ if self.handle: self.router.del_handler(self.handle) self.handle = None self._latch.put(Message.dead()) def empty(self): + """ + Return :data:`True` if calling :meth:`get` would block. + + As with :class:`Queue.Queue`, :data:`True` may be returned even though + a subsequent call to :meth:`get` will succeed, since a message may be + posted at any moment between :meth:`empty` and :meth:`get`. + """ return self._latch.empty() def get(self, timeout=None, block=True, throw_dead=True): + """ + Sleep waiting for a message to arrive on this receiver. + + :param float timeout: + If not :data:`None`, specifies a timeout in seconds. + + :raises mitogen.core.ChannelError: + The remote end indicated the channel should be closed, or + communication with its parent context was lost. + + :raises mitogen.core.TimeoutError: + Timeout was reached. + + :returns: + `(msg, data)` tuple, where `msg` is the :class:`Message` that was + received, and `data` is its unpickled data part. + """ _vv and IOLOG.debug('%r.get(timeout=%r, block=%r)', self, timeout, block) msg = self._latch.get(timeout=timeout, block=block) if msg.is_dead and throw_dead: @@ -696,6 +859,10 @@ class Receiver(object): return msg def __iter__(self): + """ + Yield consecutive :class:`Message` instances delivered to this receiver + until :class:`ChannelError` is raised. + """ while True: msg = self.get(throw_dead=False) if msg.is_dead: @@ -1213,6 +1380,28 @@ class Stream(BasicStream): class Context(object): + """ + Represent a remote context regardless of the underlying connection method. + Context objects are simple facades that emit messages through an + associated router, and have :ref:`signals` raised against them in response + to various events relating to the context. + + **Note:** This is the somewhat limited core version, used by child + contexts. The master subclass is documented below this one. + + Contexts maintain no internal state and are thread-safe. + + Prefer :meth:`Router.context_by_id` over constructing context objects + explicitly, as that method is deduplicating, and returns the only context + instance :ref:`signals` will be raised on. + + :param Router router: + Router to emit messages through. + :param int context_id: + Context ID. + :param str name: + Context name. + """ remote_name = None def __init__(self, router, context_id, name=None): @@ -1231,6 +1420,23 @@ class Context(object): fire(self, 'disconnect') def send_async(self, msg, persist=False): + """ + Arrange for `msg` to be delivered to this context, with replies + directed to a newly constructed receiver. :attr:`dst_id + ` is set to the target context ID, and :attr:`reply_to + ` is set to the newly constructed receiver's handle. + + :param bool persist: + If :data:`False`, the handler will be unregistered after a single + message has been received. + + :param mitogen.core.Message msg: + The message. + + :returns: + :class:`Receiver` configured to receive any replies sent to the + message's `reply_to` handle. + """ if self.router.broker._thread == threading.currentThread(): # TODO raise SystemError('Cannot making blocking call on broker thread') @@ -1254,8 +1460,13 @@ class Context(object): return self.send_async(msg) def send(self, msg): - """send `obj` to `handle`, and tell the broker we have output. May - be called from any thread.""" + """ + Arrange for `msg` to be delivered to this context. :attr:`dst_id + ` is set to the target context ID. + + :param Message msg: + Message. + """ msg.dst_id = self.context_id self.router.route(msg) @@ -1264,7 +1475,19 @@ class Context(object): return recv.get().unpickle() def send_await(self, msg, deadline=None): - """Send `msg` and wait for a response with an optional timeout.""" + """ + Like :meth:`send_async`, but expect a single reply (`persist=False`) + delivered within `deadline` seconds. + + :param mitogen.core.Message msg: + The message. + :param float deadline: + If not :data:`None`, seconds before timing out waiting for a reply. + :returns: + Deserialized reply. + :raises TimeoutError: + No message was received and `deadline` passed. + """ receiver = self.send_async(msg) response = receiver.get(deadline) data = response.unpickle() @@ -1710,8 +1933,33 @@ class IoLogger(BasicStream): class Router(object): + """ + Route messages between contexts, and invoke local handlers for messages + addressed to this context. :meth:`Router.route() ` straddles the + :class:`Broker ` and user threads, it is safe to call + anywhere. + + **Note:** This is the somewhat limited core version of the Router class + used by child contexts. The master subclass is documented below this one. + """ context_class = Context max_message_size = 128 * 1048576 + + #: When :data:`True`, permit children to only communicate with the current + #: context or a parent of the current context. Routing between siblings or + #: children of parents is prohibited, ensuring no communication is possible + #: between intentionally partitioned networks, such as when a program + #: simultaneously manipulates hosts spread across a corporate and a + #: production network, or production networks that are otherwise + #: air-gapped. + #: + #: Sending a prohibited message causes an error to be logged and a dead + #: message to be sent in reply to the errant message, if that message has + #: ``reply_to`` set. + #: + #: The value of :data:`unidirectional` becomes the default for the + #: :meth:`local() ` `unidirectional` + #: parameter. unidirectional = False def __init__(self, broker): @@ -1751,7 +1999,7 @@ class Router(object): fire(self._context_by_id[target_id], 'disconnect') - def on_stream_disconnect(self, stream): + def _on_stream_disconnect(self, stream): for context in self._context_by_id.values(): stream_ = self._stream_by_id.get(context.context_id) if stream_ is stream: @@ -1764,6 +2012,10 @@ class Router(object): func(Message.dead()) def context_by_id(self, context_id, via_id=None, create=True, name=None): + """ + Messy factory/lookup function to find a context by its ID, or construct + it. In future this will be replaced by a much more sensible interface. + """ context = self._context_by_id.get(context_id) if create and not context: context = self.context_class(self, context_id, name=name) @@ -1773,21 +2025,85 @@ class Router(object): return context def register(self, context, stream): + """ + Register a newly constructed context and its associated stream, and add + the stream's receive side to the I/O multiplexer. This method remains + public while the design has not yet settled. + """ _v and LOG.debug('register(%r, %r)', context, stream) self._stream_by_id[context.context_id] = stream self._context_by_id[context.context_id] = context self.broker.start_receive(stream) - listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream)) + listen(stream, 'disconnect', lambda: self._on_stream_disconnect(stream)) def stream_by_id(self, dst_id): + """ + Return the :class:`Stream` that should be used to communicate with + `dst_id`. If a specific route for `dst_id` is not known, a reference to + the parent context's stream is returned. + """ return self._stream_by_id.get(dst_id, self._stream_by_id.get(mitogen.parent_id)) def del_handler(self, handle): + """ + Remove the handle registered for `handle` + + :raises KeyError: + The handle wasn't registered. + """ del self._handle_map[handle] def add_handler(self, fn, handle=None, persist=True, policy=None, respondent=None): + """ + Invoke `fn(msg)` for each Message sent to `handle` from this context. + Unregister after one invocation if `persist` is :data:`False`. If + `handle` is :data:`None`, a new handle is allocated and returned. + + :param int handle: + If not :data:`None`, an explicit handle to register, usually one of + the ``mitogen.core.*`` constants. If unspecified, a new unused + handle will be allocated. + + :param bool persist: + If :data:`False`, the handler will be unregistered after a single + message has been received. + + :param Context respondent: + Context that messages to this handle are expected to be sent from. + If specified, arranges for a dead message to be delivered to `fn` + when disconnection of the context is detected. + + In future `respondent` will likely also be used to prevent other + contexts from sending messages to the handle. + + :param function policy: + Function invoked as `policy(msg, stream)` where `msg` is a + :class:`mitogen.core.Message` about to be delivered, and `stream` + is the :class:`mitogen.core.Stream` on which it was received. The + function must return :data:`True`, otherwise an error is logged and + delivery is refused. + + Two built-in policy functions exist: + + * :func:`has_parent_authority`: requires the message arrived from a + parent context, or a context acting with a parent context's + authority (``auth_id``). + + * :func:`mitogen.parent.is_immediate_child`: requires the + message arrived from an immediately connected child, for use in + messaging patterns where either something becomes buggy or + insecure by permitting indirect upstream communication. + + In case of refusal, and the message's ``reply_to`` field is + nonzero, a :class:`mitogen.core.CallError` is delivered to the + sender indicating refusal occurred. + + :return: + `handle`, or if `handle` was :data:`None`, the newly allocated + handle. + """ handle = handle or next(self._last_handle) _vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist) @@ -1848,6 +2164,19 @@ class Router(object): msg.reply(Message.dead(), router=self) def _async_route(self, msg, in_stream=None): + """ + Arrange for `msg` to be forwarded towards its destination. If its + destination is the local context, then arrange for it to be dispatched + using the local handlers. + + This is a lower overhead version of :meth:`route` that may only be + called from the I/O multiplexer thread. + + :param Stream in_stream: + If not :data:`None`, the stream the message arrived on. Used for + performing source route verification, to ensure sensitive messages + such as ``CALL_FUNCTION`` arrive only from trusted contexts. + """ _vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, in_stream) if len(msg.data) > self.max_message_size: @@ -1902,6 +2231,15 @@ class Router(object): out_stream._send(msg) def route(self, msg): + """ + Arrange for the :class:`Message` `msg` to be delivered to its + destination using any relevant downstream context, or if none is found, + by forwarding the message upstream towards the master context. If `msg` + is destined for the local context, it is dispatched using the handles + registered with :meth:`add_handler`. + + This may be called from any thread. + """ self.broker.defer(self._async_route, msg) diff --git a/mitogen/parent.py b/mitogen/parent.py index 78a62380..2f9b2079 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1390,7 +1390,16 @@ class CallChain(object): class Context(mitogen.core.Context): + """ + Extend :class:`mitogen.core.Context` with functionality useful to masters, + and child contexts who later become parents. Currently when this class is + required, the target context's router is upgraded at runtime. + """ + #: A :class:`CallChain` instance constructed by default, with pipelining + #: disabled. :meth:`call`, :meth:`call_async` and :meth:`call_no_reply` use + #: this instance. call_chain_class = CallChain + via = None def __init__(self, *args, **kwargs): @@ -1406,15 +1415,41 @@ class Context(mitogen.core.Context): return hash((self.router, self.context_id)) def call_async(self, fn, *args, **kwargs): + """ + See :meth:`CallChain.call_async`. + """ return self.default_call_chain.call_async(fn, *args, **kwargs) def call(self, fn, *args, **kwargs): + """ + See :meth:`CallChain.call`. + """ return self.default_call_chain.call(fn, *args, **kwargs) def call_no_reply(self, fn, *args, **kwargs): + """ + See :meth:`CallChain.call_no_reply`. + """ self.default_call_chain.call_no_reply(fn, *args, **kwargs) def shutdown(self, wait=False): + """ + Arrange for the context to receive a ``SHUTDOWN`` message, triggering + graceful shutdown. + + Due to a lack of support for timers, no attempt is made yet to force + terminate a hung context using this method. This will be fixed shortly. + + :param bool wait: + If :data:`True`, block the calling thread until the context has + completely terminated. + + :returns: + If `wait` is :data:`False`, returns a :class:`mitogen.core.Latch` + whose :meth:`get() ` method returns + :data:`None` when shutdown completes. The `timeout` parameter may + be used to implement graceful timeouts. + """ LOG.debug('%r.shutdown() sending SHUTDOWN', self) latch = mitogen.core.Latch() mitogen.core.listen(self, 'disconnect', lambda: latch.put(None)) @@ -1666,6 +1701,13 @@ class Router(mitogen.core.Router): msg.reply(None) def add_route(self, target_id, stream): + """ + Arrange for messages whose `dst_id` is `target_id` to be forwarded on + the directly connected stream for `via_id`. This method is called + automatically in response to :data:`mitogen.core.ADD_ROUTE` messages, + but remains public while the design has not yet settled, and situations + may arise where routing is not fully automatic. + """ LOG.debug('%r.add_route(%r, %r)', self, target_id, stream) assert isinstance(target_id, int) assert isinstance(stream, Stream) diff --git a/mitogen/select.py b/mitogen/select.py index ce4023a9..6d46c336 100644 --- a/mitogen/select.py +++ b/mitogen/select.py @@ -34,11 +34,57 @@ class Error(mitogen.core.Error): class Select(object): - notify = None + """ + Support scatter/gather asynchronous calls and waiting on multiple + receivers, channels, and sub-Selects. Accepts a sequence of + :class:`mitogen.core.Receiver` or :class:`mitogen.select.Select` instances + and returns the first value posted to any receiver or select. - @classmethod - def all(cls, receivers): - return list(msg.unpickle() for msg in cls(receivers)) + If `oneshot` is :data:`True`, then remove each receiver as it yields a + result; since :meth:`__iter__` terminates once the final receiver is + removed, this makes it convenient to respond to calls made in parallel: + + .. code-block:: python + + total = 0 + recvs = [c.call_async(long_running_operation) for c in contexts] + + for msg in mitogen.select.Select(recvs): + print('Got %s from %s' % (msg, msg.receiver)) + total += msg.unpickle() + + # Iteration ends when last Receiver yields a result. + print('Received total %s from %s receivers' % (total, len(recvs))) + + :class:`Select` may drive a long-running scheduler: + + .. code-block:: python + + with mitogen.select.Select(oneshot=False) as select: + while running(): + for msg in select: + process_result(msg.receiver.context, msg.unpickle()) + for context, workfunc in get_new_work(): + select.add(context.call_async(workfunc)) + + :class:`Select` may be nested: + + .. code-block:: python + + subselects = [ + mitogen.select.Select(get_some_work()), + mitogen.select.Select(get_some_work()), + mitogen.select.Select([ + mitogen.select.Select(get_some_work()), + mitogen.select.Select(get_some_work()) + ]) + ] + + for msg in mitogen.select.Select(selects): + print(msg.unpickle()) + """ + + notify = None def __init__(self, receivers=(), oneshot=True): self._receivers = [] @@ -47,12 +93,46 @@ class Select(object): for recv in receivers: self.add(recv) + @classmethod + def all(cls, receivers): + """ + Take an iterable of receivers and retrieve a :class:`Message` from + each, returning the result of calling `msg.unpickle()` on each in turn. + Results are returned in the order they arrived. + + This is sugar for handling batch :meth:`Context.call_async + ` invocations: + + .. code-block:: python + + print('Total disk usage: %.02fMiB' % (sum( + mitogen.select.Select.all( + context.call_async(get_disk_usage) + for context in contexts + ) / 1048576.0 + ),)) + + However, unlike in a naive comprehension such as: + + .. code-block:: python + + recvs = [c.call_async(get_disk_usage) for c in contexts] + sum(recv.get().unpickle() for recv in recvs) + + Result processing happens in the order results arrive, rather than the + order requests were issued, so :meth:`all` should always be faster. + """ + return list(msg.unpickle() for msg in cls(receivers)) + def _put(self, value): self._latch.put(value) if self.notify: self.notify(self) def __bool__(self): + """ + Return :data:`True` if any receivers are registered with this select. + """ return bool(self._receivers) def __enter__(self): @@ -62,6 +142,11 @@ class Select(object): self.close() def __iter__(self): + """ + Yield the result of :meth:`get` until no receivers remain in the + select, either because `oneshot` is :data:`True`, or each receiver was + explicitly removed via :meth:`remove`. + """ while self._receivers: yield self.get() @@ -80,6 +165,14 @@ class Select(object): owned_msg = 'Cannot add: Receiver is already owned by another Select' def add(self, recv): + """ + Add the :class:`mitogen.core.Receiver` or :class:`Select` `recv` to the + select. + + :raises mitogen.select.Error: + An attempt was made to add a :class:`Select` to which this select + is indirectly a member of. + """ if isinstance(recv, Select): recv._check_no_loop(self) @@ -95,6 +188,12 @@ class Select(object): not_present_msg = 'Instance is not a member of this Select' def remove(self, recv): + """ + Remove the :class:`mitogen.core.Receiver` or :class:`Select` `recv` + from the select. Note that if the receiver has notified prior to + :meth:`remove`, it will still be returned by a subsequent :meth:`get`. + This may change in a future version. + """ try: if recv.notify != self._put: raise ValueError @@ -104,16 +203,59 @@ class Select(object): raise Error(self.not_present_msg) def close(self): + """ + Remove the select's notifier function from each registered receiver, + mark the associated latch as closed, and cause any thread currently + sleeping in :meth:`get` to be woken with + :class:`mitogen.core.LatchError`. + + This is necessary to prevent memory leaks in long-running receivers. It + is called automatically when the Python :keyword:`with` statement is + used. + """ for recv in self._receivers[:]: self.remove(recv) self._latch.close() def empty(self): + """ + Return :data:`True` if calling :meth:`get` would block. + + As with :class:`Queue.Queue`, :data:`True` may be returned even though + a subsequent call to :meth:`get` will succeed, since a message may be + posted at any moment between :meth:`empty` and :meth:`get`. + + :meth:`empty` may return :data:`False` even when :meth:`get` would + block if another thread has drained a receiver added to this select. + This can be avoided by only consuming each receiver from a single + thread. + """ return self._latch.empty() empty_msg = 'Cannot get(), Select instance is empty' def get(self, timeout=None, block=True): + """ + Fetch the next available value from any receiver, or raise + :class:`mitogen.core.TimeoutError` if no value is available within + `timeout` seconds. + + On success, the message's :attr:`receiver + ` attribute is set to the receiver. + + :param float timeout: + Timeout in seconds. + :param bool block: + If :data:`False`, immediately raise + :class:`mitogen.core.TimeoutError` if the select is empty. + :return: + :class:`mitogen.core.Message` + :raises mitogen.core.TimeoutError: + Timeout was reached. + :raises mitogen.core.LatchError: + :meth:`close` has been called, and the underlying latch is no + longer valid. + """ if not self._receivers: raise Error(self.empty_msg) diff --git a/mitogen/utils.py b/mitogen/utils.py index 4fd80aa1..31669ea9 100644 --- a/mitogen/utils.py +++ b/mitogen/utils.py @@ -46,6 +46,11 @@ else: def disable_site_packages(): + """ + Remove all entries mentioning ``site-packages`` or ``Extras`` from + :attr:sys.path. Used primarily for testing on OS X within a virtualenv, + where OS X bundles some ancient version of the :mod:`six` module. + """ for entry in sys.path[:]: if 'site-packages' in entry or 'Extras' in entry: sys.path.remove(entry) @@ -65,6 +70,26 @@ def log_get_formatter(): def log_to_file(path=None, io=False, level='INFO'): + """ + Install a new :class:`logging.Handler` writing applications logs to the + filesystem. Useful when debugging slave IO problems. + + Parameters to this function may be overridden at runtime using environment + variables. See :ref:`logging-env-vars`. + + :param str path: + If not :data:`None`, a filesystem path to write logs to. Otherwise, + logs are written to :data:`sys.stderr`. + + :param bool io: + If :data:`True`, include extremely verbose IO logs in the output. + Useful for debugging hangs, less useful for debugging application code. + + :param str level: + Name of the :mod:`logging` package constant that is the minimum level + to log at. Useful levels are ``DEBUG``, ``INFO``, ``WARNING``, and + ``ERROR``. + """ log = logging.getLogger('') if path: fp = open(path, 'w', 1) @@ -94,6 +119,14 @@ def log_to_file(path=None, io=False, level='INFO'): def run_with_router(func, *args, **kwargs): + """ + Arrange for `func(router, \*args, \**kwargs)` to run with a temporary + :class:`mitogen.master.Router`, ensuring the Router and Broker are + correctly shut down during normal or exceptional return. + + :returns: + `func`'s return value. + """ broker = mitogen.master.Broker() router = mitogen.master.Router(broker) try: @@ -104,6 +137,17 @@ def run_with_router(func, *args, **kwargs): def with_router(func): + """ + Decorator version of :func:`run_with_router`. Example: + + .. code-block:: python + + @with_router + def do_stuff(router, arg): + pass + + do_stuff(blah, 123) + """ def wrapper(*args, **kwargs): return run_with_router(func, *args, **kwargs) if mitogen.core.PY3: @@ -122,7 +166,27 @@ PASSTHROUGH = ( mitogen.core.Secret, ) + def cast(obj): + """ + Many tools love to subclass built-in types in order to implement useful + functionality, such as annotating the safety of a Unicode string, or adding + additional methods to a dict. However, cPickle loves to preserve those + subtypes during serialization, resulting in CallError during :meth:`call + ` in the target when it tries to deserialize + the data. + + This function walks the object graph `obj`, producing a copy with any + custom sub-types removed. The functionality is not default since the + resulting walk may be computationally expensive given a large enough graph. + + See :ref:`serialization-rules` for a list of supported types. + + :param obj: + Object to undecorate. + :returns: + Undecorated object. + """ if isinstance(obj, dict): return dict((cast(k), cast(v)) for k, v in iteritems(obj)) if isinstance(obj, (list, tuple)):