You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mitogen/examples/select_loop.py

104 lines
3.8 KiB
Python

#
# This demonstrates using a nested select.Select() to simultaneously watch for
# in-progress events generated by a bunch of function calls, and the completion
# of those function calls.
#
# We start 5 children and run a function in each of them in parallel. The
# function writes the numbers 1..5 to a Sender before returning. The master
# reads the numbers from each child as they are generated, and exits the loop
# when the last function returns.
#
from __future__ import absolute_import
from __future__ import print_function
import time
import mitogen
import mitogen.select
def count_to(sender, n, wait=0.333):
for x in range(n):
sender.send(x)
time.sleep(wait)
@mitogen.main()
def main(router):
# Start 5 subprocesses and give them made up names.
contexts = {
'host%d' % (i,): router.local()
for i in range(5)
}
# Used later to recover hostname. A future Mitogen will provide a better
# way to get app data references back out of its IO primitives, for now you
# need to do it manually.
hostname_by_context_id = {
context.context_id: hostname
for hostname, context in contexts.items()
}
# I am a select that holds the receivers that will receive the function
# call results. Selects are one-shot by default, which means each receiver
# is removed from them as a result arrives. Therefore it means the last
# function has completed when bool(calls_sel) is False.
calls_sel = mitogen.select.Select()
# I receive the numbers as they are counted.
status_recv = mitogen.core.Receiver(router)
# Start the function calls
for hostname, context in contexts.items():
calls_sel.add(
context.call_async(
count_to,
sender=status_recv.to_sender(),
n=5,
wait=0.333
)
)
# Create a select subscribed to the function call result Select, and to the
# number-counting receiver. Any message arriving on any child of this
# Select will wake it up -- be it a message arriving on the status
# receiver, or any message arriving on any of the function call result
# receivers.
# Once last call is completed, calls_sel will be empty since it's
# oneshot=True (the default), causing __bool__ to be False
both_sel = mitogen.select.Select([status_recv, calls_sel], oneshot=False)
# Internally selects store a strong reference from Receiver->Select that
# will keep the Select alive as long as the receiver is alive. If a
# receiver or select otherwise 'outlives' some parent select, attempting to
# re-add it to a new select will raise an error. In all cases it's
# desirable to call Select.close(). This can be done as a context manager.
with calls_sel, both_sel:
while calls_sel:
try:
msg = both_sel.get(timeout=60.0)
except mitogen.core.TimeoutError:
print("No update in 60 seconds, something's broke")
break
hostname = hostname_by_context_id[msg.src_id]
if msg.receiver is status_recv: # https://mitogen.readthedocs.io/en/stable/api.html#mitogen.core.Message.receiver
# handle a status update
print('Got status update from %s: %s' % (hostname, msg.unpickle()))
elif msg.receiver is calls_sel: # subselect
# handle a function call result.
try:
assert None == msg.unpickle()
print('Task succeeded on %s' % (hostname,))
except mitogen.core.CallError as e:
print('Task failed on host %s: %s' % (hostname, e))
if calls_sel:
print('Some tasks did not complete.')
else:
print('All tasks completed.')