diff --git a/ansible_mitogen/strategy/mitogen.py b/ansible_mitogen/strategy/mitogen.py index d0363d6e..b7777ec8 100644 --- a/ansible_mitogen/strategy/mitogen.py +++ b/ansible_mitogen/strategy/mitogen.py @@ -27,6 +27,7 @@ from __future__ import absolute_import import os +import threading import mitogen import mitogen.master @@ -42,13 +43,13 @@ import ansible_mitogen.mixins def wrap_action_loader__get(name, *args, **kwargs): """ - Trap calls to the action plug-in loader, supplementing the type of any - ActionModule with Mitogen's ActionModuleMixin before constructing it, - causing the mix-in methods to override any inherited from Ansible's base - class, replacing most shell use with pure Python equivalents. + While the mitogen stratey is active, trap action_loader.get() calls, + augmenting any fetched class with ActionModuleMixin, which replaces various + helper methods inherited from ActionBase with implementations that avoid + the use of shell fragments wherever possible. - This is preferred to static subclassing as it generalizes to third party - action modules existing outside the Ansible tree. + This is used instead of static subclassing as it generalizes to third party + action modules outside the Ansible tree. """ klass = action_loader__get(name, class_only=True) if klass: @@ -59,12 +60,13 @@ def wrap_action_loader__get(name, *args, **kwargs): return adorned_klass return adorned_klass(*args, **kwargs) -action_loader__get = ansible.plugins.action_loader.get -ansible.plugins.action_loader.get = wrap_action_loader__get - def wrap_connection_loader__get(name, play_context, new_stdin): """ + While the mitogen strategy is active, rewrite connection_loader.get() calls + for the 'ssh' and 'local' transports into corresponding requests for the + 'mitogen' connection type, passing the original transport name into it as + an argument, so that it can emulate the original type. """ kwargs = {} if name in ('ssh', 'local'): @@ -72,16 +74,29 @@ def wrap_connection_loader__get(name, play_context, new_stdin): name = 'mitogen' return connection_loader__get(name, play_context, new_stdin, **kwargs) -connection_loader__get = ansible.plugins.connection_loader.get -ansible.plugins.connection_loader.get = wrap_connection_loader__get - class ContextProxyService(mitogen.service.Service): """ - Implement a service accessible from worker processes connecting back into - the top-level process. The service yields an existing context matching a - connection configuration if it exists, otherwise it constructs a new - conncetion before returning it. + Used by worker processes connecting back into the top-level process to + fetch the single Context instance corresponding to the supplied connection + configuration, creating a matching connection if it does not exist. + + For connection methods and their parameters, refer to: + http://mitogen.readthedocs.io/en/latest/api.html#context-factories + + This concentrates all SSH connections in the top-level process, which may + become a bottleneck. There are multiple ways to fix that: + * creating one .local() child context per CPU and sharding connections + between them, using the master process to route messages, or + * as above, but having each child create a unique UNIX listener and + having workers connect in directly. + + :param dict dct: + Parameters passed to mitogen.master.Router.[method](). One key, + "method", is popped from the dictionary and used to look up the method. + + :returns mitogen.master.Context: + Corresponding Context instance. """ well_known_id = 500 max_message_size = 1000 @@ -104,9 +119,9 @@ class ContextProxyService(mitogen.service.Service): class StrategyModule(ansible.plugins.strategy.linear.StrategyModule): def __init__(self, *args, **kwargs): super(StrategyModule, self).__init__(*args, **kwargs) - self.add_connection_plugin_path() + self._add_connection_plugin_path() - def add_connection_plugin_path(self): + def _add_connection_plugin_path(self): """ Automatically add the connection plug-in directory to the ModuleLoader path, slightly reduces end-user configuration. @@ -116,23 +131,69 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule): conn_dir = os.path.join(basedir, 'connection') ansible.plugins.connection_loader.add_directory(conn_dir) - def run(self, iterator, play_context, result=0): + def _setup_logging(self): + """ + Setup Mitogen's logging. Eventually this should be redirected into + Ansible's logging. + """ + log_level = os.environ.get('MITOGEN_LOG_LEVEL', 'INFO') + log_io = 'MITOGEN_LOG_IO' in os.environ + mitogen.utils.log_to_file(level=log_level, io=log_io) + + def _setup_master(self): + """ + Construct a Router, Broker, mitogen.unix listener thread, and thread + serving connection requests from worker processes. + """ self.router = mitogen.master.Router() self.router.responder.whitelist_prefix('ansible') self.router.responder.whitelist_prefix('ansible_mitogen') self.listener = mitogen.unix.Listener(self.router) os.environ['LISTENER_SOCKET_PATH'] = self.listener.path + # TODO: gracefully shutdown and join on this at exist. self.service = ContextProxyService(self.router) - #mitogen.utils.log_to_file(level='DEBUG', io=False) - - import threading - th = threading.Thread(target=self.service.run) - th.setDaemon(True) - th.start() + self.service_thread = threading.Thread(target=self.service.run) + self.service_thread.setDaemon(True) + self.service_thread.start() + def _run_with_master(self, iterator, play_context, result): + """ + Arrange for a mitogen.master.Router to be available for the duration of + the strategy's real run() method. + """ + self._setup_logging() + self._setup_master() try: return super(StrategyModule, self).run(iterator, play_context) finally: self.router.broker.shutdown() os.unlink(self.listener.path) + + def _install_wrappers(self): + """ + Install our PluginLoader monkey patches and update global variables + with references to the real functions. + """ + global action_loader__get + action_loader__get = ansible.plugins.action_loader.get + ansible.plugins.action_loader.get = wrap_action_loader__get + + global connection_loader__get + connection_loader__get = ansible.plugins.connection_loader.get + ansible.plugins.connection_loader.get = wrap_connection_loader__get + + def _remove_wrappers(self): + """ + Uninstall the PluginLoader monkey patches. + """ + ansible.plugins.action_loader.get = action_loader__get + ansible.plugins.connection_loader.get = connection_loader__get + + def run(self, iterator, play_context, result=0): + self._install_wrappers() + try: + return self._run_with_master(iterator, play_context, result) + finally: + self._remove_wrappers() + self._setup_master()