From 61365236add51ee5892ad03781a3fdb99af73340 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 25 May 2018 04:47:59 +0100 Subject: [PATCH] docs/select: fix up more references, fix headings. --- docs/api.rst | 320 ++++++++++++++++++++------------------- docs/getting_started.rst | 2 +- examples/mitop.py | 3 +- mitogen/core.py | 2 +- 4 files changed, 166 insertions(+), 161 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 2580b11c..43ae8ecb 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -71,157 +71,6 @@ be sent to any context that will be used to establish additional child contexts. -.. currentmodule:: mitogen.master - -.. class:: Select (receivers=(), oneshot=True) - - Support scatter/gather asynchronous calls and waiting on multiple - receivers, channels, and sub-Selects. Accepts a sequence of - :py:class:`mitogen.core.Receiver` or :py:class:`mitogen.master.Select` - instances and returns the first value posted to any receiver or select. - - If `oneshot` is :data:`True`, then remove each receiver as it yields a - result; since :py:meth:`__iter__` terminates once the final receiver is - removed, this makes it convenient to respond to calls made in parallel: - - .. code-block:: python - - total = 0 - recvs = [c.call_async(long_running_operation) for c in contexts] - - for msg in mitogen.master.Select(recvs): - print 'Got %s from %s' % (msg, msg.receiver) - total += msg.unpickle() - - # Iteration ends when last Receiver yields a result. - print 'Received total %s from %s receivers' % (total, len(recvs)) - - :py:class:`Select` may drive a long-running scheduler: - - .. code-block:: python - - with mitogen.master.Select(oneshot=False) as select: - while running(): - for msg in select: - process_result(msg.receiver.context, msg.unpickle()) - for context, workfunc in get_new_work(): - select.add(context.call_async(workfunc)) - - :py:class:`Select` may be nested: - - .. code-block:: python - - subselects = [ - mitogen.master.Select(get_some_work()), - mitogen.master.Select(get_some_work()), - mitogen.master.Select([ - mitogen.master.Select(get_some_work()), - mitogen.master.Select(get_some_work()) - ]) - ] - - for msg in mitogen.master.Select(selects): - print msg.unpickle() - - .. py:classmethod:: all (it) - - Take an iterable of receivers and retrieve a :py:class:`Message` from - each, returning the result of calling `msg.unpickle()` on each in turn. - Results are returned in the order they arrived. - - This is sugar for handling batch :py:class:`Context.call_async` - invocations: - - .. code-block:: python - - print('Total disk usage: %.02fMiB' % (sum( - mitogen.master.Select.all( - context.call_async(get_disk_usage) - for context in contexts - ) / 1048576.0 - ),)) - - However, unlike in a naive comprehension such as: - - .. code-block:: python - - sum(context.call_async(get_disk_usage).get().unpickle() - for context in contexts) - - Result processing happens concurrently to new results arriving, so - :py:meth:`all` should always be faster. - - .. py:method:: get (timeout=None, block=True) - - Fetch the next available value from any receiver, or raise - :py:class:`mitogen.core.TimeoutError` if no value is available within - `timeout` seconds. - - On success, the message's :py:attr:`receiver - ` attribute is set to the receiver. - - :param float timeout: - Timeout in seconds. - :param bool block: - If :py:data:`False`, immediately raise - :py:class:`mitogen.core.TimeoutError` if the select is empty. - :return: - :py:class:`mitogen.core.Message` - :raises mitogen.core.TimeoutError: - Timeout was reached. - :raises mitogen.core.LatchError: - :py:meth:`close` has been called, and the underlying latch is no - longer valid. - - .. py:method:: __bool__ () - - Return :data:`True` if any receivers are registered with this select. - - .. py:method:: close () - - Remove the select's notifier function from each registered receiver, - mark the associated latch as closed, and cause any thread currently - sleeping in :py:meth:`get` to be woken with - :py:class:`mitogen.core.LatchError`. - - This is necessary to prevent memory leaks in long-running receivers. It - is called automatically when the Python :keyword:`with` statement is - used. - - .. py:method:: empty () - - Return :data:`True` if calling :py:meth:`get` would block. - - As with :py:class:`Queue.Queue`, :data:`True` may be returned even - though a subsequent call to :py:meth:`get` will succeed, since a - message may be posted at any moment between :py:meth:`empty` and - :py:meth:`get`. - - :py:meth:`empty` may return ``False`` even when :py:meth:`get` would - block if another thread has drained a receiver added to this select. - This can be avoided by only consuming each receiver from a single - thread. - - .. py:method:: __iter__ (self) - - Yield the result of :py:meth:`get` until no receivers remain in the - select, either because `oneshot` is :data:`True`, or each receiver was - explicitly removed via :py:meth:`remove`. - - .. py:method:: add (recv) - - Add the :py:class:`mitogen.core.Receiver` or - :py:class:`mitogen.core.Channel` `recv` to the select. - - .. py:method:: remove (recv) - - Remove the :py:class:`mitogen.core.Receiver` or - :py:class:`mitogen.core.Channel` `recv` from the select. Note that if - the receiver has notified prior to :py:meth:`remove`, then it will - still be returned by a subsequent :py:meth:`get`. This may change in a - future version. - - mitogen.fakessh --------------- @@ -327,7 +176,7 @@ Message Class .. attribute:: receiver The :py:class:`mitogen.core.Receiver` over which the message was last - received. Part of the :py:class:`mitogen.master.Select` interface. + received. Part of the :py:class:`mitogen.select.Select` interface. Defaults to :py:data:`None`. .. attribute:: dst_id @@ -392,7 +241,6 @@ Message Class Router Class ============ - .. currentmodule:: mitogen.core .. class:: Router @@ -1061,7 +909,7 @@ Context Class Asynchronous calls may be dispatched in parallel to multiple contexts and consumed as they complete using - :py:class:`mitogen.master.Select`. + :py:class:`mitogen.select.Select`. .. method:: call (fn, \*args, \*\*kwargs) @@ -1077,7 +925,7 @@ Context Class Receiver Class --------------- +============== .. currentmodule:: mitogen.core @@ -1109,7 +957,7 @@ Receiver Class If not ``None``, a reference to a function invoked as `notify(receiver)` when a new message is delivered to this receiver. - Used by :py:class:`mitogen.master.Select` to implement waiting on + Used by :py:class:`mitogen.select.Select` to implement waiting on multiple receivers. .. py:method:: to_sender () @@ -1186,7 +1034,7 @@ Receiver Class Sender Class ------------- +============ .. currentmodule:: mitogen.core @@ -1213,8 +1061,164 @@ Sender Class Send `data` to the remote end. +Select Class +============ + +.. module:: mitogen.select + +.. currentmodule:: mitogen.select + +.. class:: Select (receivers=(), oneshot=True) + + Support scatter/gather asynchronous calls and waiting on multiple + receivers, channels, and sub-Selects. Accepts a sequence of + :py:class:`mitogen.core.Receiver` or :py:class:`mitogen.select.Select` + instances and returns the first value posted to any receiver or select. + + If `oneshot` is :data:`True`, then remove each receiver as it yields a + result; since :py:meth:`__iter__` terminates once the final receiver is + removed, this makes it convenient to respond to calls made in parallel: + + .. code-block:: python + + total = 0 + recvs = [c.call_async(long_running_operation) for c in contexts] + + for msg in mitogen.select.Select(recvs): + print 'Got %s from %s' % (msg, msg.receiver) + total += msg.unpickle() + + # Iteration ends when last Receiver yields a result. + print 'Received total %s from %s receivers' % (total, len(recvs)) + + :py:class:`Select` may drive a long-running scheduler: + + .. code-block:: python + + with mitogen.select.Select(oneshot=False) as select: + while running(): + for msg in select: + process_result(msg.receiver.context, msg.unpickle()) + for context, workfunc in get_new_work(): + select.add(context.call_async(workfunc)) + + :py:class:`Select` may be nested: + + .. code-block:: python + + subselects = [ + mitogen.select.Select(get_some_work()), + mitogen.select.Select(get_some_work()), + mitogen.select.Select([ + mitogen.select.Select(get_some_work()), + mitogen.select.Select(get_some_work()) + ]) + ] + + for msg in mitogen.select.Select(selects): + print msg.unpickle() + + .. py:classmethod:: all (it) + + Take an iterable of receivers and retrieve a :py:class:`Message` from + each, returning the result of calling `msg.unpickle()` on each in turn. + Results are returned in the order they arrived. + + This is sugar for handling batch :py:class:`Context.call_async` + invocations: + + .. code-block:: python + + print('Total disk usage: %.02fMiB' % (sum( + mitogen.select.Select.all( + context.call_async(get_disk_usage) + for context in contexts + ) / 1048576.0 + ),)) + + However, unlike in a naive comprehension such as: + + .. code-block:: python + + sum(context.call_async(get_disk_usage).get().unpickle() + for context in contexts) + + Result processing happens concurrently to new results arriving, so + :py:meth:`all` should always be faster. + + .. py:method:: get (timeout=None, block=True) + + Fetch the next available value from any receiver, or raise + :py:class:`mitogen.core.TimeoutError` if no value is available within + `timeout` seconds. + + On success, the message's :py:attr:`receiver + ` attribute is set to the receiver. + + :param float timeout: + Timeout in seconds. + :param bool block: + If :py:data:`False`, immediately raise + :py:class:`mitogen.core.TimeoutError` if the select is empty. + :return: + :py:class:`mitogen.core.Message` + :raises mitogen.core.TimeoutError: + Timeout was reached. + :raises mitogen.core.LatchError: + :py:meth:`close` has been called, and the underlying latch is no + longer valid. + + .. py:method:: __bool__ () + + Return :data:`True` if any receivers are registered with this select. + + .. py:method:: close () + + Remove the select's notifier function from each registered receiver, + mark the associated latch as closed, and cause any thread currently + sleeping in :py:meth:`get` to be woken with + :py:class:`mitogen.core.LatchError`. + + This is necessary to prevent memory leaks in long-running receivers. It + is called automatically when the Python :keyword:`with` statement is + used. + + .. py:method:: empty () + + Return :data:`True` if calling :py:meth:`get` would block. + + As with :py:class:`Queue.Queue`, :data:`True` may be returned even + though a subsequent call to :py:meth:`get` will succeed, since a + message may be posted at any moment between :py:meth:`empty` and + :py:meth:`get`. + + :py:meth:`empty` may return ``False`` even when :py:meth:`get` would + block if another thread has drained a receiver added to this select. + This can be avoided by only consuming each receiver from a single + thread. + + .. py:method:: __iter__ (self) + + Yield the result of :py:meth:`get` until no receivers remain in the + select, either because `oneshot` is :data:`True`, or each receiver was + explicitly removed via :py:meth:`remove`. + + .. py:method:: add (recv) + + Add the :py:class:`mitogen.core.Receiver` or + :py:class:`mitogen.core.Channel` `recv` to the select. + + .. py:method:: remove (recv) + + Remove the :py:class:`mitogen.core.Receiver` or + :py:class:`mitogen.core.Channel` `recv` from the select. Note that if + the receiver has notified prior to :py:meth:`remove`, then it will + still be returned by a subsequent :py:meth:`get`. This may change in a + future version. + + Channel Class -------------- +============= .. currentmodule:: mitogen.core diff --git a/docs/getting_started.rst b/docs/getting_started.rst index ddaccf87..985f796a 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -256,7 +256,7 @@ without the need for writing asynchronous code:: contexts = [router.ssh(hostname=hn) for hn in hostnames] calls = [context.call(my_func) for context in contexts] - for recv, (msg, data) in mitogen.master.Select(calls): + for msg in mitogen.select.Select(calls): print 'Reply from %s: %s' % (recv.context, data) diff --git a/examples/mitop.py b/examples/mitop.py index 89de9aad..1cfd92dd 100644 --- a/examples/mitop.py +++ b/examples/mitop.py @@ -18,6 +18,7 @@ import time import mitogen.core import mitogen.master +import mitogen.select import mitogen.utils @@ -185,7 +186,7 @@ def main(router): sys.exit(1) delay = 2.0 - select = mitogen.master.Select(oneshot=False) + select = mitogen.select.Select(oneshot=False) hosts = [] # For each hostname on the command line, create a Host instance, a Mitogen diff --git a/mitogen/core.py b/mitogen/core.py index 6e8856b8..2a5049f2 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -1421,7 +1421,7 @@ class Router(object): return if policy and not policy(msg, stream): - LOG.error('%r: policy refused message: %r', self, msg) + LOG.error('%r: policy for %r refused message: %r', self, fn, msg) if msg.reply_to: self.route(Message.pickled( CallError(self.refused_msg),