diff --git a/examples/select_loop.py b/examples/select_loop.py new file mode 100644 index 00000000..414bb358 --- /dev/null +++ b/examples/select_loop.py @@ -0,0 +1,88 @@ + +# +# This demonstrates using a nested select.Select() to simultaneously watch for +# in-progress events generated by a bunch of function calls, and the completion +# of those function calls. +# +# We start 5 children and run a function in each of them in parallel. The +# function writes the numbers 1..5 to a Sender before returning. The master +# reads the numbers from each child as they are generated, and exits the loop +# when the last function returns. +# + +from __future__ import absolute_import +from __future__ import print_function + +import time + +import mitogen +import mitogen.select + + +def count_to(sender, n, wait=0.333): + for x in range(n): + sender.send(x) + time.sleep(wait) + + +@mitogen.main() +def main(router): + # Start 5 subprocesses and give them made up names. + contexts = { + 'host%d' % (i,): router.local() + for i in range(5) + } + + # Used later to recover hostname from failed call. + hostname_by_context_id = { + context.context_id: hostname + for hostname, context in contexts.items() + } + + # I am a select that holds the receivers that will receive the function + # call results. Selects are one-shot by default, which means each receiver + # is removed from them as a result arrives. Therefore it means the last + # function has completed when bool(calls_sel) is False. + calls_sel = mitogen.select.Select() + + # I receive the numbers as they are counted. + status_recv = mitogen.core.Receiver(router) + + # Start the function calls + for hostname, context in contexts.items(): + calls_sel.add( + context.call_async( + count_to, + sender=status_recv.to_sender(), + n=5, + wait=0.333 + ) + ) + + # Create a select subscribed to the function call result Select, and to the + # number-counting receiver. + both_sel = mitogen.select.Select([status_recv, calls_sel], oneshot=False) + + # Once last call is completed, calls will be empty since it's oneshot=True + # (the default), causing __bool__ to be False + while calls_sel: + try: + msg = both_sel.get(timeout=60.0) + except mitogen.core.TimeoutError: + print("No update in 60 seconds, something's broke") + raise Crash() + + hostname = hostname_by_context_id[msg.src_id] + + if msg.receiver is status_recv: # https://mitogen.readthedocs.io/en/stable/api.html#mitogen.core.Message.receiver + # handle a status update + print('Got status update from %s: %s' % (hostname, msg.unpickle())) + elif msg.receiver is calls_sel: # subselect + # handle a function call result. + try: + assert None == msg.unpickle() + print('Task succeeded on %s' % (hostname,)) + except mitogen.core.CallError as e: + print('Task failed on host %s: %s' % (hostname, e)) + + print('All tasks completed.')