# # This demonstrates using a nested select.Select() to simultaneously watch for # in-progress events generated by a bunch of function calls, and the completion # of those function calls. # # We start 5 children and run a function in each of them in parallel. The # function writes the numbers 1..5 to a Sender before returning. The master # reads the numbers from each child as they are generated, and exits the loop # when the last function returns. # from __future__ import absolute_import from __future__ import print_function import time import mitogen import mitogen.select def count_to(sender, n, wait=0.333): for x in range(n): sender.send(x) time.sleep(wait) @mitogen.main() def main(router): # Start 5 subprocesses and give them made up names. contexts = { 'host%d' % (i,): router.local() for i in range(5) } # Used later to recover hostname. A future Mitogen will provide a better # way to get app data references back out of its IO primitives, for now you # need to do it manually. hostname_by_context_id = { context.context_id: hostname for hostname, context in contexts.items() } # I am a select that holds the receivers that will receive the function # call results. Selects are one-shot by default, which means each receiver # is removed from them as a result arrives. Therefore it means the last # function has completed when bool(calls_sel) is False. calls_sel = mitogen.select.Select() # I receive the numbers as they are counted. status_recv = mitogen.core.Receiver(router) # Start the function calls for hostname, context in contexts.items(): calls_sel.add( context.call_async( count_to, sender=status_recv.to_sender(), n=5, wait=0.333 ) ) # Create a select subscribed to the function call result Select, and to the # number-counting receiver. Any message arriving on any child of this # Select will wake it up -- be it a message arriving on the status # receiver, or any message arriving on any of the function call result # receivers. # Once last call is completed, calls_sel will be empty since it's # oneshot=True (the default), causing __bool__ to be False both_sel = mitogen.select.Select([status_recv, calls_sel], oneshot=False) # Internally selects store a strong reference from Receiver->Select that # will keep the Select alive as long as the receiver is alive. If a # receiver or select otherwise 'outlives' some parent select, attempting to # re-add it to a new select will raise an error. In all cases it's # desirable to call Select.close(). This can be done as a context manager. with calls_sel, both_sel: while calls_sel: try: msg = both_sel.get(timeout=60.0) except mitogen.core.TimeoutError: print("No update in 60 seconds, something's broke") break hostname = hostname_by_context_id[msg.src_id] if msg.receiver is status_recv: # https://mitogen.readthedocs.io/en/stable/api.html#mitogen.core.Message.receiver # handle a status update print('Got status update from %s: %s' % (hostname, msg.unpickle())) elif msg.receiver is calls_sel: # subselect # handle a function call result. try: assert None == msg.unpickle() print('Task succeeded on %s' % (hostname,)) except mitogen.core.CallError as e: print('Task failed on host %s: %s' % (hostname, e)) if calls_sel: print('Some tasks did not complete.') else: print('All tasks completed.')