From 686929273847be63a0cd8841bf405b9243e54991 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 26 Sep 2017 16:46:47 +0530 Subject: [PATCH] issue #20: initial implementation of mitogen.master.Select(). --- docs/api.rst | 98 +++++++++++++++++++++++++++++++++++++++++++++++ docs/index.rst | 27 +++++++++++++ mitogen/core.py | 38 ++++++++++++------ mitogen/master.py | 60 +++++++++++++++++++++++++++++ 4 files changed, 211 insertions(+), 12 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index e1496d66..7bf54c75 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -60,6 +60,104 @@ be sent to any context that will be used to establish additional child contexts. +.. class:: mitogen.master.Select (receivers=(), oneshot=True) + + Support scatter/gather asynchronous calls and waiting on multiple + receivers, channels, and sub-Selects. Accepts a sequence of + :py:class:`mitogen.core.Receiver` or :py:class:`mitogen.master.Select` + instances and returns the first value posted to any receiver or select. + + If `oneshot` is ``True``, then remove each receiver as it yields a result; + since :py:meth:`__iter__` terminates once the final receiver is removed + from the select, this makes it convenient to respond to several call + results with minimal effort: + + .. code-block:: python + + total = 0 + recvs = [c.call_async(long_running_operation) for c in contexts] + + with mitogen.master.Select(recvs) as select: + for recv, msg in select: + value = msg.unpickle() + print 'Got %s from %s' % (value, recv) + total += value + + # Iteration ends when last Receiver yields a result. + print 'Received total %s from %s receivers' % (total, len(recvs)) + + :py:class:`Select` may also be used to drive a long-running scheduler: + + .. code-block:: python + + with mitogen.master.Select() as select: + while running(): + for recv, msg in select: + process_result(recv.context, msg.unpickle()) + for context, workfunc in get_new_work(): + select.add(context.call_async(workfunc)) + + :py:class:`Select` may be nested: + + .. code-block:: python + + subselects = [ + mitogen.master.Select(get_some_work()), + mitogen.master.Select(get_some_work()) + ] + + with mitogen.master.Select(selects, oneshot=False) as select: + while subselects and any(subselects): # Calls __bool__() + print select.get() + + .. py:method:: get (timeout=None) + + Fetch the next available value from any receiver, or raise + :py:class:`mitogen.core.TimeoutError` if no value is available within + `timeout` seconds. + + :param float timeout: + Timeout in seconds. + + :return: + `(receiver, msg)` + + .. py:method:: __bool__ () + + Return ``True`` if any receivers are registered with this select. + + .. py:method:: close () + + Remove the select's notifier function from each registered receiver. + Necessary to prevent memory leaks in long-running receivers. This is + called automatically when the Python ``with:`` statement is used. + + .. py:method:: empty () + + Return ``True`` if no items appear to be queued on this receiver. Like + :py:class:`Queue.Queue`, this function's return value cannot be relied + upon. + + .. py:method:: __iter__ (self) + + Yield the result of :py:meth:`get` until no receivers remain in the + select, either because `oneshot` is ``True``, or each receiver was + explicitly removed via :py:meth:`remove`. + + .. py:method:: add (recv) + + Add the :py:class:`mitogen.core.Receiver` or + :py:class:`mitogen.core.Channel` `recv` to the select. + + .. py:method:: remove (recv) + + Remove the :py:class:`mitogen.core.Receiver` or + :py:class:`mitogen.core.Channel` `recv` from the select. Note that if + the receiver has notified prior to :py:meth:`remove`, then it will + still be returned by a subsequent :py:meth:`get`. This may change in a + future version. + + mitogen.fakessh --------------- diff --git a/docs/index.rst b/docs/index.rst index be98dbbf..21d14f1e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -298,6 +298,33 @@ usual into the slave process. mitogen.utils.run_with_broker(main) +Scatter/Gather Function Calls +############################# + +Functions may be invoked asynchronously, with results returned as they become +available. + +.. code-block:: python + + def disk_usage(path): + return sum((os.path.getsize(os.path.join(dirpath, name)) + for dirpath, dirnames, filenames in os.walk(path) + for name in dirnames + filenames), 0) + + + if __name__ == '__main__' and mitogen.is_master: + contexts = connect_contexts(...) + receivers = [c.call_async(disk_usage, '/tmp') for c in contexts] + total = 0 + + for recv, msg in mitogen.master.Select(receivers): + value = result.unpickle() + print 'Context %s /tmp usage: %d' % (recv.context, value) + total += value + + print 'Total /tmp usage across all contexts: %d' % (total,) + + Event-driven IO ############### diff --git a/mitogen/core.py b/mitogen/core.py index ab862609..b52c014c 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -264,9 +264,28 @@ class Sender(object): ) +def _queue_interruptible_get(queue, timeout=None, block=True): + if timeout: + timeout += time.time() + + msg = None + while msg is None and (timeout is None or timeout < time.time()): + try: + msg = queue.get(True, 0.5, block=block) + except Queue.Empty: + if block: + break + + if msg is None: + raise TimeoutError('deadline exceeded.') + + return msg + + class Receiver(object): def __init__(self, router, handle=None, persist=True, respondent=None): self.router = router + self.notify = [] self.handle = handle # Avoid __repr__ crash in add_handler() self.handle = router.add_handler(self._on_receive, handle, persist, respondent) @@ -279,27 +298,22 @@ class Receiver(object): """Callback from the Stream; appends data to the internal queue.""" IOLOG.debug('%r._on_receive(%r)', self, msg) self._queue.put(msg) + for func in self.notify: + func(self) def close(self): self._queue.put(_DEAD) + def empty(self): + return self._queue.empty() + def get(self, timeout=None): """Receive an object, or ``None`` if `timeout` is reached.""" IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout) - if timeout: - timeout += time.time() - - msg = None - while msg is None and (timeout is None or timeout < time.time()): - try: - msg = self._queue.get(True, 0.5) - except Queue.Empty: - continue - - if msg is None: - raise TimeoutError('deadline exceeded.') + msg = _queue_interruptible_get(self._queue, timeout) IOLOG.debug('%r.on_receive() got %r', self, msg) + if msg == _DEAD: raise ChannelError('Channel closed by local end.') diff --git a/mitogen/master.py b/mitogen/master.py index 24f0787f..ec43fd22 100644 --- a/mitogen/master.py +++ b/mitogen/master.py @@ -231,6 +231,66 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'), co.co_consts[arg2] or ()) +class Select(object): + def __init__(self, receivers=(), oneshot=True): + self._receivers = [] + self._oneshot = oneshot + self._queue = Queue.Queue() + self._notify = [] + for recv in receivers: + self.add(recv) + + def _put(self, value): + self._queue.put(value) + for func in self._notify: + func(self) + + def __bool__(self): + return bool(self._receivers) + + def __enter__(self): + return self + + def __exit__(self, e_type, e_val, e_tb): + self.close() + + def __iter__(self): + while self._receivers: + recv, msg = self.get() + if self._oneshot: + self.remove(recv) + yield recv, msg + + def add(self, recv): + self._receivers.append(recv) + recv.notify.append(self._put) + # Avoid race by polling once after installation. + if not recv.empty(): + self._put(recv) + + def remove(self, recv): + recv.notify.remove(self._put) + + def close(self): + for recv in self._receivers[:]: + self.remove(recv) + + def empty(self): + return self._queue.empty() + + def get(self, timeout=None): + while True: + recv = mitogen.core._queue_interruptible_get(queue, timeout) + try: + return recv, recv.get(block=False) + except mitogen.core.TimeoutError: + # A receiver may have been queued with no result if another + # thread drained it before we woke up, or because another + # thread drained it between add() calling recv.empty() and + # self._put(). In this case just sleep again. + continue + + class LogForwarder(object): def __init__(self, router): self._router = router