issue #20: initial implementation of mitogen.master.Select().

pull/45/head
David Wilson 7 years ago
parent 1bb4a32271
commit 6869292738

@ -60,6 +60,104 @@ be sent to any context that will be used to establish additional child
contexts. contexts.
.. class:: mitogen.master.Select (receivers=(), oneshot=True)
Support scatter/gather asynchronous calls and waiting on multiple
receivers, channels, and sub-Selects. Accepts a sequence of
:py:class:`mitogen.core.Receiver` or :py:class:`mitogen.master.Select`
instances and returns the first value posted to any receiver or select.
If `oneshot` is ``True``, then remove each receiver as it yields a result;
since :py:meth:`__iter__` terminates once the final receiver is removed
from the select, this makes it convenient to respond to several call
results with minimal effort:
.. code-block:: python
total = 0
recvs = [c.call_async(long_running_operation) for c in contexts]
with mitogen.master.Select(recvs) as select:
for recv, msg in select:
value = msg.unpickle()
print 'Got %s from %s' % (value, recv)
total += value
# Iteration ends when last Receiver yields a result.
print 'Received total %s from %s receivers' % (total, len(recvs))
:py:class:`Select` may also be used to drive a long-running scheduler:
.. code-block:: python
with mitogen.master.Select() as select:
while running():
for recv, msg in select:
process_result(recv.context, msg.unpickle())
for context, workfunc in get_new_work():
select.add(context.call_async(workfunc))
:py:class:`Select` may be nested:
.. code-block:: python
subselects = [
mitogen.master.Select(get_some_work()),
mitogen.master.Select(get_some_work())
]
with mitogen.master.Select(selects, oneshot=False) as select:
while subselects and any(subselects): # Calls __bool__()
print select.get()
.. py:method:: get (timeout=None)
Fetch the next available value from any receiver, or raise
:py:class:`mitogen.core.TimeoutError` if no value is available within
`timeout` seconds.
:param float timeout:
Timeout in seconds.
:return:
`(receiver, msg)`
.. py:method:: __bool__ ()
Return ``True`` if any receivers are registered with this select.
.. py:method:: close ()
Remove the select's notifier function from each registered receiver.
Necessary to prevent memory leaks in long-running receivers. This is
called automatically when the Python ``with:`` statement is used.
.. py:method:: empty ()
Return ``True`` if no items appear to be queued on this receiver. Like
:py:class:`Queue.Queue`, this function's return value cannot be relied
upon.
.. py:method:: __iter__ (self)
Yield the result of :py:meth:`get` until no receivers remain in the
select, either because `oneshot` is ``True``, or each receiver was
explicitly removed via :py:meth:`remove`.
.. py:method:: add (recv)
Add the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` to the select.
.. py:method:: remove (recv)
Remove the :py:class:`mitogen.core.Receiver` or
:py:class:`mitogen.core.Channel` `recv` from the select. Note that if
the receiver has notified prior to :py:meth:`remove`, then it will
still be returned by a subsequent :py:meth:`get`. This may change in a
future version.
mitogen.fakessh mitogen.fakessh
--------------- ---------------

@ -298,6 +298,33 @@ usual into the slave process.
mitogen.utils.run_with_broker(main) mitogen.utils.run_with_broker(main)
Scatter/Gather Function Calls
#############################
Functions may be invoked asynchronously, with results returned as they become
available.
.. code-block:: python
def disk_usage(path):
return sum((os.path.getsize(os.path.join(dirpath, name))
for dirpath, dirnames, filenames in os.walk(path)
for name in dirnames + filenames), 0)
if __name__ == '__main__' and mitogen.is_master:
contexts = connect_contexts(...)
receivers = [c.call_async(disk_usage, '/tmp') for c in contexts]
total = 0
for recv, msg in mitogen.master.Select(receivers):
value = result.unpickle()
print 'Context %s /tmp usage: %d' % (recv.context, value)
total += value
print 'Total /tmp usage across all contexts: %d' % (total,)
Event-driven IO Event-driven IO
############### ###############

@ -264,9 +264,28 @@ class Sender(object):
) )
def _queue_interruptible_get(queue, timeout=None, block=True):
if timeout:
timeout += time.time()
msg = None
while msg is None and (timeout is None or timeout < time.time()):
try:
msg = queue.get(True, 0.5, block=block)
except Queue.Empty:
if block:
break
if msg is None:
raise TimeoutError('deadline exceeded.')
return msg
class Receiver(object): class Receiver(object):
def __init__(self, router, handle=None, persist=True, respondent=None): def __init__(self, router, handle=None, persist=True, respondent=None):
self.router = router self.router = router
self.notify = []
self.handle = handle # Avoid __repr__ crash in add_handler() self.handle = handle # Avoid __repr__ crash in add_handler()
self.handle = router.add_handler(self._on_receive, handle, self.handle = router.add_handler(self._on_receive, handle,
persist, respondent) persist, respondent)
@ -279,27 +298,22 @@ class Receiver(object):
"""Callback from the Stream; appends data to the internal queue.""" """Callback from the Stream; appends data to the internal queue."""
IOLOG.debug('%r._on_receive(%r)', self, msg) IOLOG.debug('%r._on_receive(%r)', self, msg)
self._queue.put(msg) self._queue.put(msg)
for func in self.notify:
func(self)
def close(self): def close(self):
self._queue.put(_DEAD) self._queue.put(_DEAD)
def empty(self):
return self._queue.empty()
def get(self, timeout=None): def get(self, timeout=None):
"""Receive an object, or ``None`` if `timeout` is reached.""" """Receive an object, or ``None`` if `timeout` is reached."""
IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout) IOLOG.debug('%r.on_receive(timeout=%r)', self, timeout)
if timeout:
timeout += time.time()
msg = None
while msg is None and (timeout is None or timeout < time.time()):
try:
msg = self._queue.get(True, 0.5)
except Queue.Empty:
continue
if msg is None:
raise TimeoutError('deadline exceeded.')
msg = _queue_interruptible_get(self._queue, timeout)
IOLOG.debug('%r.on_receive() got %r', self, msg) IOLOG.debug('%r.on_receive() got %r', self, msg)
if msg == _DEAD: if msg == _DEAD:
raise ChannelError('Channel closed by local end.') raise ChannelError('Channel closed by local end.')

@ -231,6 +231,66 @@ def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'),
co.co_consts[arg2] or ()) co.co_consts[arg2] or ())
class Select(object):
def __init__(self, receivers=(), oneshot=True):
self._receivers = []
self._oneshot = oneshot
self._queue = Queue.Queue()
self._notify = []
for recv in receivers:
self.add(recv)
def _put(self, value):
self._queue.put(value)
for func in self._notify:
func(self)
def __bool__(self):
return bool(self._receivers)
def __enter__(self):
return self
def __exit__(self, e_type, e_val, e_tb):
self.close()
def __iter__(self):
while self._receivers:
recv, msg = self.get()
if self._oneshot:
self.remove(recv)
yield recv, msg
def add(self, recv):
self._receivers.append(recv)
recv.notify.append(self._put)
# Avoid race by polling once after installation.
if not recv.empty():
self._put(recv)
def remove(self, recv):
recv.notify.remove(self._put)
def close(self):
for recv in self._receivers[:]:
self.remove(recv)
def empty(self):
return self._queue.empty()
def get(self, timeout=None):
while True:
recv = mitogen.core._queue_interruptible_get(queue, timeout)
try:
return recv, recv.get(block=False)
except mitogen.core.TimeoutError:
# A receiver may have been queued with no result if another
# thread drained it before we woke up, or because another
# thread drained it between add() calling recv.empty() and
# self._put(). In this case just sleep again.
continue
class LogForwarder(object): class LogForwarder(object):
def __init__(self, router): def __init__(self, router):
self._router = router self._router = router

Loading…
Cancel
Save