examples: import select_loop.py.
parent
6fafc0a631
commit
b5831a0d76
@ -0,0 +1,88 @@
|
|||||||
|
|
||||||
|
#
|
||||||
|
# This demonstrates using a nested select.Select() to simultaneously watch for
|
||||||
|
# in-progress events generated by a bunch of function calls, and the completion
|
||||||
|
# of those function calls.
|
||||||
|
#
|
||||||
|
# We start 5 children and run a function in each of them in parallel. The
|
||||||
|
# function writes the numbers 1..5 to a Sender before returning. The master
|
||||||
|
# reads the numbers from each child as they are generated, and exits the loop
|
||||||
|
# when the last function returns.
|
||||||
|
#
|
||||||
|
|
||||||
|
from __future__ import absolute_import
|
||||||
|
from __future__ import print_function
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import mitogen
|
||||||
|
import mitogen.select
|
||||||
|
|
||||||
|
|
||||||
|
def count_to(sender, n, wait=0.333):
|
||||||
|
for x in range(n):
|
||||||
|
sender.send(x)
|
||||||
|
time.sleep(wait)
|
||||||
|
|
||||||
|
|
||||||
|
@mitogen.main()
|
||||||
|
def main(router):
|
||||||
|
# Start 5 subprocesses and give them made up names.
|
||||||
|
contexts = {
|
||||||
|
'host%d' % (i,): router.local()
|
||||||
|
for i in range(5)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Used later to recover hostname from failed call.
|
||||||
|
hostname_by_context_id = {
|
||||||
|
context.context_id: hostname
|
||||||
|
for hostname, context in contexts.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
# I am a select that holds the receivers that will receive the function
|
||||||
|
# call results. Selects are one-shot by default, which means each receiver
|
||||||
|
# is removed from them as a result arrives. Therefore it means the last
|
||||||
|
# function has completed when bool(calls_sel) is False.
|
||||||
|
calls_sel = mitogen.select.Select()
|
||||||
|
|
||||||
|
# I receive the numbers as they are counted.
|
||||||
|
status_recv = mitogen.core.Receiver(router)
|
||||||
|
|
||||||
|
# Start the function calls
|
||||||
|
for hostname, context in contexts.items():
|
||||||
|
calls_sel.add(
|
||||||
|
context.call_async(
|
||||||
|
count_to,
|
||||||
|
sender=status_recv.to_sender(),
|
||||||
|
n=5,
|
||||||
|
wait=0.333
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a select subscribed to the function call result Select, and to the
|
||||||
|
# number-counting receiver.
|
||||||
|
both_sel = mitogen.select.Select([status_recv, calls_sel], oneshot=False)
|
||||||
|
|
||||||
|
# Once last call is completed, calls will be empty since it's oneshot=True
|
||||||
|
# (the default), causing __bool__ to be False
|
||||||
|
while calls_sel:
|
||||||
|
try:
|
||||||
|
msg = both_sel.get(timeout=60.0)
|
||||||
|
except mitogen.core.TimeoutError:
|
||||||
|
print("No update in 60 seconds, something's broke")
|
||||||
|
raise Crash()
|
||||||
|
|
||||||
|
hostname = hostname_by_context_id[msg.src_id]
|
||||||
|
|
||||||
|
if msg.receiver is status_recv: # https://mitogen.readthedocs.io/en/stable/api.html#mitogen.core.Message.receiver
|
||||||
|
# handle a status update
|
||||||
|
print('Got status update from %s: %s' % (hostname, msg.unpickle()))
|
||||||
|
elif msg.receiver is calls_sel: # subselect
|
||||||
|
# handle a function call result.
|
||||||
|
try:
|
||||||
|
assert None == msg.unpickle()
|
||||||
|
print('Task succeeded on %s' % (hostname,))
|
||||||
|
except mitogen.core.CallError as e:
|
||||||
|
print('Task failed on host %s: %s' % (hostname, e))
|
||||||
|
|
||||||
|
print('All tasks completed.')
|
Loading…
Reference in New Issue