docs/select: fix up more references, fix headings.

pull/255/head
David Wilson 6 years ago
parent 4bf3d01104
commit 61365236ad

@ -71,157 +71,6 @@ be sent to any context that will be used to establish additional child
contexts. contexts.
.. currentmodule:: mitogen.master
.. class:: Select (receivers=(), oneshot=True)
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:py:class:`mitogen.core.Receiver` or :py:class:`mitogen.master.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :py:meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.master.Select(recvs):
print 'Got %s from %s' % (msg, msg.receiver)
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
:py:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.master.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work()),
mitogen.master.Select([
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work())
])
]
for msg in mitogen.master.Select(selects):
print msg.unpickle()
.. py:classmethod:: all (it)
Take an iterable of receivers and retrieve a :py:class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch :py:class:`Context.call_async`
invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.master.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
sum(context.call_async(get_disk_usage).get().unpickle()
for context in contexts)
Result processing happens concurrently to new results arriving, so
:py:meth:`all` should always be faster.
.. py:method:: get (timeout=None, block=True)
Fetch the next available value from any receiver, or raise
:py:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :py:attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :py:data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:py:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the underlying latch is no
longer valid.
.. py:method:: __bool__ ()
Return :data:`True` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :py:meth:`get` to be woken with
:py:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
.. py:method:: empty ()
Return :data:`True` if calling :py:meth:`get` would block.
As with :py:class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
:py:meth:`empty` may return ``False`` even when :py:meth:`get` would
block if another thread has drained a receiver added to this select.
This can be avoided by only consuming each receiver from a single
thread.
.. py:method:: __iter__ (self)
Yield the result of :py:meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :py:meth:`remove`.
.. py:method:: add (recv)
Add the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :py:meth:`remove`, then it will
still be returned by a subsequent :py:meth:`get`. This may change in a
future version.
mitogen.fakessh mitogen.fakessh
--------------- ---------------
@ -327,7 +176,7 @@ Message Class
.. attribute:: receiver .. attribute:: receiver
The :py:class:`mitogen.core.Receiver` over which the message was last The :py:class:`mitogen.core.Receiver` over which the message was last
received. Part of the :py:class:`mitogen.master.Select` interface. received. Part of the :py:class:`mitogen.select.Select` interface.
Defaults to :py:data:`None`. Defaults to :py:data:`None`.
.. attribute:: dst_id .. attribute:: dst_id
@ -392,7 +241,6 @@ Message Class
Router Class Router Class
============ ============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
.. class:: Router .. class:: Router
@ -1061,7 +909,7 @@ Context Class
Asynchronous calls may be dispatched in parallel to multiple Asynchronous calls may be dispatched in parallel to multiple
contexts and consumed as they complete using contexts and consumed as they complete using
:py:class:`mitogen.master.Select`. :py:class:`mitogen.select.Select`.
.. method:: call (fn, \*args, \*\*kwargs) .. method:: call (fn, \*args, \*\*kwargs)
@ -1077,7 +925,7 @@ Context Class
Receiver Class Receiver Class
-------------- ==============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
@ -1109,7 +957,7 @@ Receiver Class
If not ``None``, a reference to a function invoked as If not ``None``, a reference to a function invoked as
`notify(receiver)` when a new message is delivered to this receiver. `notify(receiver)` when a new message is delivered to this receiver.
Used by :py:class:`mitogen.master.Select` to implement waiting on Used by :py:class:`mitogen.select.Select` to implement waiting on
multiple receivers. multiple receivers.
.. py:method:: to_sender () .. py:method:: to_sender ()
@ -1186,7 +1034,7 @@ Receiver Class
Sender Class Sender Class
------------ ============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core
@ -1213,8 +1061,164 @@ Sender Class
Send `data` to the remote end. Send `data` to the remote end.
Select Class
============
.. module:: mitogen.select
.. currentmodule:: mitogen.select
.. class:: Select (receivers=(), oneshot=True)
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:py:class:`mitogen.core.Receiver` or :py:class:`mitogen.select.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is :data:`True`, then remove each receiver as it yields a
result; since :py:meth:`__iter__` terminates once the final receiver is
removed, this makes it convenient to respond to calls made in parallel:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
for msg in mitogen.select.Select(recvs):
print 'Got %s from %s' % (msg, msg.receiver)
total += msg.unpickle()
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
:py:class:`Select` may drive a long-running scheduler:
.. code-block:: python
with mitogen.select.Select(oneshot=False) as select:
while running():
for msg in select:
process_result(msg.receiver.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work()),
mitogen.select.Select([
mitogen.select.Select(get_some_work()),
mitogen.select.Select(get_some_work())
])
]
for msg in mitogen.select.Select(selects):
print msg.unpickle()
.. py:classmethod:: all (it)
Take an iterable of receivers and retrieve a :py:class:`Message` from
each, returning the result of calling `msg.unpickle()` on each in turn.
Results are returned in the order they arrived.
This is sugar for handling batch :py:class:`Context.call_async`
invocations:
.. code-block:: python
print('Total disk usage: %.02fMiB' % (sum(
mitogen.select.Select.all(
context.call_async(get_disk_usage)
for context in contexts
) / 1048576.0
),))
However, unlike in a naive comprehension such as:
.. code-block:: python
sum(context.call_async(get_disk_usage).get().unpickle()
for context in contexts)
Result processing happens concurrently to new results arriving, so
:py:meth:`all` should always be faster.
.. py:method:: get (timeout=None, block=True)
Fetch the next available value from any receiver, or raise
:py:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
On success, the message's :py:attr:`receiver
<mitogen.core.Message.receiver>` attribute is set to the receiver.
:param float timeout:
Timeout in seconds.
:param bool block:
If :py:data:`False`, immediately raise
:py:class:`mitogen.core.TimeoutError` if the select is empty.
:return:
:py:class:`mitogen.core.Message`
:raises mitogen.core.TimeoutError:
Timeout was reached.
:raises mitogen.core.LatchError:
:py:meth:`close` has been called, and the underlying latch is no
longer valid.
.. py:method:: __bool__ ()
Return :data:`True` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver,
mark the associated latch as closed, and cause any thread currently
sleeping in :py:meth:`get` to be woken with
:py:class:`mitogen.core.LatchError`.
This is necessary to prevent memory leaks in long-running receivers. It
is called automatically when the Python :keyword:`with` statement is
used.
.. py:method:: empty ()
Return :data:`True` if calling :py:meth:`get` would block.
As with :py:class:`Queue.Queue`, :data:`True` may be returned even
though a subsequent call to :py:meth:`get` will succeed, since a
message may be posted at any moment between :py:meth:`empty` and
:py:meth:`get`.
:py:meth:`empty` may return ``False`` even when :py:meth:`get` would
block if another thread has drained a receiver added to this select.
This can be avoided by only consuming each receiver from a single
thread.
.. py:method:: __iter__ (self)
Yield the result of :py:meth:`get` until no receivers remain in the
select, either because `oneshot` is :data:`True`, or each receiver was
explicitly removed via :py:meth:`remove`.
.. py:method:: add (recv)
Add the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :py:meth:`remove`, then it will
still be returned by a subsequent :py:meth:`get`. This may change in a
future version.
Channel Class Channel Class
------------- =============
.. currentmodule:: mitogen.core .. currentmodule:: mitogen.core

@ -256,7 +256,7 @@ without the need for writing asynchronous code::
contexts = [router.ssh(hostname=hn) for hn in hostnames] contexts = [router.ssh(hostname=hn) for hn in hostnames]
calls = [context.call(my_func) for context in contexts] calls = [context.call(my_func) for context in contexts]
for recv, (msg, data) in mitogen.master.Select(calls): for msg in mitogen.select.Select(calls):
print 'Reply from %s: %s' % (recv.context, data) print 'Reply from %s: %s' % (recv.context, data)

@ -18,6 +18,7 @@ import time
import mitogen.core import mitogen.core
import mitogen.master import mitogen.master
import mitogen.select
import mitogen.utils import mitogen.utils
@ -185,7 +186,7 @@ def main(router):
sys.exit(1) sys.exit(1)
delay = 2.0 delay = 2.0
select = mitogen.master.Select(oneshot=False) select = mitogen.select.Select(oneshot=False)
hosts = [] hosts = []
# For each hostname on the command line, create a Host instance, a Mitogen # For each hostname on the command line, create a Host instance, a Mitogen

@ -1421,7 +1421,7 @@ class Router(object):
return return
if policy and not policy(msg, stream): if policy and not policy(msg, stream):
LOG.error('%r: policy refused message: %r', self, msg) LOG.error('%r: policy for %r refused message: %r', self, fn, msg)
if msg.reply_to: if msg.reply_to:
self.route(Message.pickled( self.route(Message.pickled(
CallError(self.refused_msg), CallError(self.refused_msg),

Loading…
Cancel
Save