diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 00000000..82cf7ac0 --- /dev/null +++ b/ChangeLog @@ -0,0 +1,5 @@ + +2018-04-30 v0.0.1 + +* Initial release to support the Mitogen extension for Ansible. + diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 69dec732..fcb5851d 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -33,6 +33,8 @@ import shlex import sys import time +import jinja2.runtime +import ansible.constants as C import ansible.errors import ansible.plugins.connection @@ -47,6 +49,135 @@ import ansible_mitogen.services LOG = logging.getLogger(__name__) +def _connect_local(spec): + return { + 'method': 'local', + 'kwargs': { + 'python_path': spec['python_path'], + } + } + + +def _connect_ssh(spec): + return { + 'method': 'ssh', + 'kwargs': { + 'check_host_keys': False, # TODO + 'hostname': spec['remote_addr'], + 'username': spec['remote_user'], + 'password': spec['password'], + 'port': spec['port'], + 'python_path': spec['python_path'], + 'identity_file': spec['private_key_file'], + 'ssh_path': spec['ssh_executable'], + 'connect_timeout': spec['ansible_ssh_timeout'], + 'ssh_args': spec['ssh_args'], + } + } + + +def _connect_docker(spec): + return { + 'method': 'docker', + 'kwargs': { + 'username': spec['remote_user'], + 'container': spec['remote_addr'], + 'python_path': spec['python_path'], + 'connect_timeout': spec['ansible_ssh_timeout'] or spec['timeout'], + } + } + + +def _connect_sudo(spec): + return { + 'method': 'sudo', + 'kwargs': { + 'username': spec['become_user'], + 'password': spec['become_pass'], + 'python_path': spec['python_path'], + 'sudo_path': spec['become_exe'], + 'connect_timeout': spec['timeout'], + 'sudo_args': spec['sudo_args'], + } + } + + +CONNECTION_METHOD = { + 'sudo': _connect_sudo, + 'ssh': _connect_ssh, + 'local': _connect_local, + 'docker': _connect_docker, +} + + +def config_from_play_context(transport, inventory_name, connection): + """ + Return a dict representing all important connection configuration, allowing + the same functions to work regardless of whether configuration came from + play_context (direct connection) or host vars (mitogen_via=). + """ + return { + 'transport': transport, + 'inventory_name': inventory_name, + 'remote_addr': connection._play_context.remote_addr, + 'remote_user': connection._play_context.remote_user, + 'become': connection._play_context.become, + 'become_method': connection._play_context.become_method, + 'become_user': connection._play_context.become_user, + 'become_pass': connection._play_context.become_pass, + 'password': connection._play_context.password, + 'port': connection._play_context.port, + 'python_path': connection.python_path, + 'private_key_file': connection._play_context.private_key_file, + 'ssh_executable': connection._play_context.ssh_executable, + 'timeout': connection._play_context.timeout, + 'ansible_ssh_timeout': connection.ansible_ssh_timeout, + 'ssh_args': [ + term + for s in ( + getattr(connection._play_context, 'ssh_args', ''), + getattr(connection._play_context, 'ssh_common_args', ''), + getattr(connection._play_context, 'ssh_extra_args', '') + ) + for term in shlex.split(s or '') + ], + 'become_exe': connection._play_context.become_exe, + 'sudo_args': [ + term + for s in ( + connection._play_context.sudo_flags, + connection._play_context.become_flags + ) + for term in shlex.split(s or '') + ], + 'mitogen_via': connection.mitogen_via, + } + + +def config_from_hostvars(transport, inventory_name, connection, + hostvars, become_user): + """ + Override config_from_play_context() to take equivalent information from + host vars. + """ + config = config_from_play_context(transport, inventory_name, connection) + hostvars = dict(hostvars) + return dict(config, **{ + 'remote_addr': hostvars.get('ansible_hostname', inventory_name), + 'become': bool(become_user), + 'become_user': become_user, + 'become_pass': None, + 'remote_user': hostvars.get('ansible_user'), # TODO + 'password': (hostvars.get('ansible_ssh_pass') or + hostvars.get('ansible_password')), + 'port': hostvars.get('ansible_port'), + 'python_path': hostvars.get('ansible_python_interpreter'), + 'private_key_file': (hostvars.get('ansible_ssh_private_key_file') or + hostvars.get('ansible_private_key_file')), + 'mitogen_via': hostvars.get('mitogen_via'), + }) + + class Connection(ansible.plugins.connection.ConnectionBase): #: mitogen.master.Broker for this worker. broker = None @@ -58,45 +189,36 @@ class Connection(ansible.plugins.connection.ConnectionBase): #: presently always the master process. parent = None - #: mitogen.master.Context connected to the target machine's initial SSH - #: account. - host = None - #: mitogen.master.Context connected to the target user account on the - #: target machine (i.e. via sudo), or simply a copy of :attr:`host` if - #: become is not in use. + #: target machine (i.e. via sudo). context = None #: Only sudo is supported for now. become_methods = ['sudo'] - #: Set by the constructor according to whichever connection type this - #: connection should emulate. We emulate the original connection type to - #: work around artificial limitations in e.g. the synchronize action, which - #: hard-codes 'local' and 'ssh' as the only allowable connection types. - transport = None - #: Set to 'ansible_python_interpreter' by on_action_run(). python_path = None - #: Set to 'ansible_sudo_exe' by on_action_run(). - sudo_path = None - #: Set to 'ansible_ssh_timeout' by on_action_run(). ansible_ssh_timeout = None + #: Set to 'mitogen_via' by on_action_run(). + mitogen_via = None + + #: Set to 'inventory_hostname' by on_action_run(). + inventory_hostname = None + + #: Set to 'hostvars' by on_action_run() + host_vars = None + #: Set after connection to the target context's home directory. _homedir = None - def __init__(self, play_context, new_stdin, original_transport, **kwargs): + def __init__(self, play_context, new_stdin, **kwargs): assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( - 'The "mitogen" connection plug-in may only be instantiated ' - 'by the "mitogen" strategy plug-in.' + 'Mitogen connection types may only be instantiated ' + 'while the "mitogen" strategy is active.' ) - - self.original_transport = original_transport - self.transport = original_transport - self.kwargs = kwargs super(Connection, self).__init__(play_context, new_stdin) def __del__(self): @@ -114,19 +236,12 @@ class Connection(ansible.plugins.connection.ConnectionBase): executing. We use the opportunity to grab relevant bits from the task-specific data. """ - self.ansible_ssh_timeout = task_vars.get( - 'ansible_ssh_timeout', - None - ) - self.python_path = task_vars.get( - 'ansible_python_interpreter', - '/usr/bin/python' - ) - self.sudo_path = task_vars.get( - 'ansible_sudo_exe', - 'sudo' - ) - + self.ansible_ssh_timeout = task_vars.get('ansible_ssh_timeout') + self.python_path = task_vars.get('ansible_python_interpreter', + '/usr/bin/python') + self.mitogen_via = task_vars.get('mitogen_via') + self.inventory_hostname = task_vars['inventory_hostname'] + self.host_vars = task_vars['hostvars'] self.close(new_task=True) @property @@ -138,109 +253,52 @@ class Connection(ansible.plugins.connection.ConnectionBase): def connected(self): return self.context is not None - def _on_connection_error(self, msg): - raise ansible.errors.AnsibleConnectionFailure(msg) - - def _on_become_error(self, msg): - # TODO: vanilla become failures yield this: - # { - # "changed": false, - # "module_stderr": "sudo: sorry, you must have a tty to run sudo\n", - # "module_stdout": "", - # "msg": "MODULE FAILURE", - # "rc": 1 - # } - # - # Currently we yield this: - # { - # "msg": "EOF on stream; last 300 bytes received: 'sudo: ....\n'" - # } - raise ansible.errors.AnsibleModuleError(msg) - - def _wrap_connect(self, on_error, kwargs): - dct = mitogen.service.call( - context=self.parent, - handle=ansible_mitogen.services.ContextService.handle, - method='get', - kwargs=mitogen.utils.cast(kwargs), - ) + def _config_from_via(self, via_spec): + become_user, _, inventory_name = via_spec.rpartition('@') + via_vars = self.host_vars[inventory_name] + if isinstance(via_vars, jinja2.runtime.Undefined): + raise ansible.errors.AnsibleConnectionFailure( + self.unknown_via_msg % ( + self.mitogen_via, + config['inventory_name'], + ) + ) - if dct['msg']: - on_error(dct['msg']) + return config_from_hostvars( + transport=via_vars.get('ansible_connection', 'ssh'), + inventory_name=inventory_name, + connection=self, + hostvars=via_vars, + become_user=become_user or None, + ) - return dct['context'], dct['home_dir'] + unknown_via_msg = 'mitogen_via=%s of %s specifies an unknown hostname' + via_cycle_msg = 'mitogen_via=%s of %s creates a cycle (%s)' + + def _stack_from_config(self, config, stack=(), seen_names=()): + if config['inventory_name'] in seen_names: + raise ansible.errors.AnsibleConnectionFailure( + self.via_cycle_msg % ( + config['mitogen_via'], + config['inventory_name'], + ' -> '.join(reversed( + seen_names + (config['inventory_name'],) + )), + ) + ) - def _connect_local(self): - """ - Fetch a reference to the local() Context from ContextService in the - master process. - """ - return self._wrap_connect(self._on_connection_error, { - 'method_name': 'local', - 'python_path': self.python_path, - }) + if config['mitogen_via']: + stack, seen_names = self._stack_from_config( + self._config_from_via(config['mitogen_via']), + stack=stack, + seen_names=seen_names + (config['inventory_name'],) + ) - def _connect_ssh(self): - """ - Fetch a reference to an SSH Context matching the play context from - ContextService in the master process. - """ - return self._wrap_connect(self._on_connection_error, { - 'method_name': 'ssh', - 'check_host_keys': False, # TODO - 'hostname': self._play_context.remote_addr, - 'username': self._play_context.remote_user, - 'password': self._play_context.password, - 'port': self._play_context.port, - 'python_path': self.python_path, - 'identity_file': self._play_context.private_key_file, - 'ssh_path': self._play_context.ssh_executable, - 'connect_timeout': self.ansible_ssh_timeout, - 'ssh_args': [ - term - for s in ( - getattr(self._play_context, 'ssh_args', ''), - getattr(self._play_context, 'ssh_common_args', ''), - getattr(self._play_context, 'ssh_extra_args', '') - ) - for term in shlex.split(s or '') - ] - }) - - def _connect_docker(self): - return self._wrap_connect(self._on_connection_error, { - 'method_name': 'docker', - 'container': self._play_context.remote_addr, - 'python_path': self.python_path, - 'connect_timeout': self._play_context.timeout, - }) - - def _connect_sudo(self, via=None, python_path=None): - """ - Fetch a reference to a sudo Context matching the play context from - ContextService in the master process. + stack += (CONNECTION_METHOD[config['transport']](config),) + if config['become']: + stack += (CONNECTION_METHOD[config['become_method']](config),) - :param via: - Parent Context of the sudo Context. For Ansible, this should always - be a Context returned by _connect_ssh(). - """ - return self._wrap_connect(self._on_become_error, { - 'method_name': 'sudo', - 'username': self._play_context.become_user, - 'password': self._play_context.become_pass, - 'python_path': python_path or self.python_path, - 'sudo_path': self.sudo_path, - 'connect_timeout': self._play_context.timeout, - 'via': via, - 'sudo_args': [ - term - for s in ( - self._play_context.sudo_flags, - self._play_context.become_flags - ) - for term in shlex.split(s or '') - ], - }) + return stack, seen_names def _connect(self): """ @@ -263,24 +321,30 @@ class Connection(ansible.plugins.connection.ConnectionBase): broker=self.broker, ) - if self.original_transport == 'local': - if self._play_context.become: - self.context, self._homedir = self._connect_sudo( - python_path=sys.executable - ) - else: - self.context, self._homedir = self._connect_local() - return + stack, _ = self._stack_from_config( + config_from_play_context( + transport=self.transport, + inventory_name=self.inventory_hostname, + connection=self + ) + ) - if self.original_transport == 'docker': - self.host, self._homedir = self._connect_docker() - elif self.original_transport == 'ssh': - self.host, self._homedir = self._connect_ssh() + dct = mitogen.service.call( + context=self.parent, + handle=ansible_mitogen.services.ContextService.handle, + method='get', + kwargs=mitogen.utils.cast({ + 'stack': stack, + }) + ) - if self._play_context.become: - self.context, self._homedir = self._connect_sudo(via=self.host) - else: - self.context = self.host + if dct['msg']: + if dct['method_name'] in self.become_methods: + raise ansible.errors.AnsibleModuleError(dct['msg']) + raise ansible.errors.AnsibleConnectionFailure(dct['msg']) + + self.context = dct['context'] + self._homedir = dct['home_dir'] def get_context_name(self): """ @@ -296,18 +360,16 @@ class Connection(ansible.plugins.connection.ConnectionBase): gracefully shut down, and wait for shutdown to complete. Safe to call multiple times. """ - for context in set([self.host, self.context]): - if context: - mitogen.service.call( - context=self.parent, - handle=ansible_mitogen.services.ContextService.handle, - method='put', - kwargs={ - 'context': context - } - ) + if self.context: + mitogen.service.call( + context=self.parent, + handle=ansible_mitogen.services.ContextService.handle, + method='put', + kwargs={ + 'context': self.context + } + ) - self.host = None self.context = None if self.broker and not new_task: self.broker.shutdown() @@ -420,3 +482,15 @@ class Connection(ansible.plugins.connection.ConnectionBase): in_path=in_path, out_path=out_path ) + + +class SshConnection(Connection): + transport = 'ssh' + + +class LocalConnection(Connection): + transport = 'local' + + +class DockerConnection(Connection): + transport = 'docker' diff --git a/ansible_mitogen/plugins/connection/mitogen_docker.py b/ansible_mitogen/plugins/connection/mitogen_docker.py new file mode 100644 index 00000000..70a90147 --- /dev/null +++ b/ansible_mitogen/plugins/connection/mitogen_docker.py @@ -0,0 +1,56 @@ +# Copyright 2017, David Wilson +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os.path +import sys + +# +# This is not the real Connection implementation module, it simply exists as a +# proxy to the real module, which is loaded using Python's regular import +# mechanism, to prevent Ansible's PluginLoader from making up a fake name that +# results in ansible_mitogen plugin modules being loaded twice: once by +# PluginLoader with a name like "ansible.plugins.connection.mitogen", which is +# stuffed into sys.modules even though attempting to import it will trigger an +# ImportError, and once under its canonical name, "ansible_mitogen.connection". +# +# Therefore we have a proxy module that imports it under the real name, and +# sets up the duff PluginLoader-imported module to just contain objects from +# the real module, so duplicate types don't exist in memory, and things like +# debuggers and isinstance() work predictably. +# + +try: + import ansible_mitogen +except ImportError: + base_dir = os.path.dirname(__file__) + sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) + del base_dir + +from ansible_mitogen.connection import DockerConnection as Connection +del os +del sys diff --git a/ansible_mitogen/plugins/connection/mitogen_local.py b/ansible_mitogen/plugins/connection/mitogen_local.py new file mode 100644 index 00000000..fc1a8565 --- /dev/null +++ b/ansible_mitogen/plugins/connection/mitogen_local.py @@ -0,0 +1,56 @@ +# Copyright 2017, David Wilson +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os.path +import sys + +# +# This is not the real Connection implementation module, it simply exists as a +# proxy to the real module, which is loaded using Python's regular import +# mechanism, to prevent Ansible's PluginLoader from making up a fake name that +# results in ansible_mitogen plugin modules being loaded twice: once by +# PluginLoader with a name like "ansible.plugins.connection.mitogen", which is +# stuffed into sys.modules even though attempting to import it will trigger an +# ImportError, and once under its canonical name, "ansible_mitogen.connection". +# +# Therefore we have a proxy module that imports it under the real name, and +# sets up the duff PluginLoader-imported module to just contain objects from +# the real module, so duplicate types don't exist in memory, and things like +# debuggers and isinstance() work predictably. +# + +try: + import ansible_mitogen +except ImportError: + base_dir = os.path.dirname(__file__) + sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) + del base_dir + +from ansible_mitogen.connection import LocalConnection as Connection +del os +del sys diff --git a/ansible_mitogen/plugins/connection/mitogen.py b/ansible_mitogen/plugins/connection/mitogen_ssh.py similarity index 97% rename from ansible_mitogen/plugins/connection/mitogen.py rename to ansible_mitogen/plugins/connection/mitogen_ssh.py index 5ae51ab1..e0a30672 100644 --- a/ansible_mitogen/plugins/connection/mitogen.py +++ b/ansible_mitogen/plugins/connection/mitogen_ssh.py @@ -51,6 +51,6 @@ except ImportError: sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) del base_dir -from ansible_mitogen.connection import Connection +from ansible_mitogen.connection import SshConnection as Connection del os del sys diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index fee50934..ae6e8ed7 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -82,9 +82,9 @@ class ContextService(mitogen.service.Service): #: Records the :meth:`get` result dict for successful calls, returned #: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`. self._response_by_key = {} - #: List of :class:`mitogen.core.Message` waiting for the result dict - #: for a particular connection config. Keyed as sbove. - self._waiters_by_key = {} + #: List of :class:`mitogen.core.Latch` awaiting the result for a + #: particular key. + self._latches_by_key = {} #: Mapping of :class:`mitogen.core.Context` -> reference count. Each #: call to :meth:`get` increases this by one. Calls to :meth:`put` #: decrease it by one. @@ -134,10 +134,10 @@ class ContextService(mitogen.service.Service): """ self._lock.acquire() try: - waiters = self._waiters_by_key.pop(key) - count = len(waiters) - for msg in waiters: - msg.reply(response) + latches = self._latches_by_key.pop(key) + count = len(latches) + for latch in latches: + latch.put(response) finally: self._lock.release() return count @@ -164,17 +164,12 @@ class ContextService(mitogen.service.Service): finally: self._lock.release() - def _update_lru(self, new_context, **kwargs): + def _update_lru(self, new_context, spec, via): """ Update the LRU ("MRU"?) list associated with the connection described by `kwargs`, destroying the most recently created context if the list is full. Finally add `new_context` to the list. """ - via = kwargs.get('via') - if via is None: - # We don't have a limit on the number of directly connections. - return - lru = self._lru_by_via.setdefault(via, []) if len(lru) < self.max_interpreters: lru.append(new_context) @@ -207,7 +202,7 @@ class ContextService(mitogen.service.Service): """ # TODO: there is a race between creation of a context and disconnection # of its related stream. An error reply should be sent to any message - # in _waiters_by_key below. + # in _latches_by_key below. self._lock.acquire() try: for context, key in list(self._key_by_context.items()): @@ -215,14 +210,14 @@ class ContextService(mitogen.service.Service): LOG.info('Dropping %r due to disconnect of %r', context, stream) self._response_by_key.pop(key, None) - self._waiters_by_key.pop(key, None) + self._latches_by_key.pop(key, None) self._refs_by_context.pop(context, None) self._lru_by_via.pop(context, None) self._refs_by_context.pop(context, None) finally: self._lock.release() - def _connect(self, key, method_name, **kwargs): + def _connect(self, key, spec, via=None): """ Actual connect implementation. Arranges for the Mitogen connection to be created and enqueues an asynchronous call to start the forked task @@ -230,11 +225,8 @@ class ContextService(mitogen.service.Service): :param key: Deduplication key representing the connection configuration. - :param method_name: - :class:`mitogen.parent.Router` method implementing the connection - type. - :param kwargs: - Keyword arguments passed to the router method. + :param spec: + Connection specification. :returns: Dict like:: @@ -248,21 +240,14 @@ class ContextService(mitogen.service.Service): :data:`None`, or `msg` is :data:`None` and the remaining fields are set. """ - method = getattr(self.router, method_name, None) - if method is None: - raise Error('no such Router method: %s' % (method_name,)) - try: - context = method(**kwargs) - except mitogen.core.StreamError as e: - return { - 'context': None, - 'home_dir': None, - 'msg': str(e), - } - - if kwargs.get('via'): - self._update_lru(context, method_name=method_name, **kwargs) + method = getattr(self.router, spec['method']) + except AttributeError: + raise Error('unsupported method: %(transport)s' % spec) + + context = method(via=via, **spec['kwargs']) + if via: + self._update_lru(context, spec, via) else: # For directly connected contexts, listen to the associated # Stream's disconnect event and use it to invalidate dependent @@ -289,60 +274,74 @@ class ContextService(mitogen.service.Service): 'msg': None, } - @mitogen.service.expose(mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'method_name': str - }) - def get(self, msg, **kwargs): - """ - Return a Context referring to an established connection with the given - configuration, establishing a new connection as necessary. - - :param str method_name: - The :class:`mitogen.parent.Router` connection method to use. - :param dict kwargs: - Keyword arguments passed to `mitogen.master.Router.[method_name]()`. - - :returns tuple: - Tuple of `(context, home_dir)`, where: - * `context` is the mitogen.master.Context referring to the - target context. - * `home_dir` is a cached copy of the remote directory. - """ - key = self.key_from_kwargs(**kwargs) + def _wait_or_start(self, spec, via=None): + latch = mitogen.core.Latch() + key = self.key_from_kwargs(via=via, **spec) self._lock.acquire() try: response = self._response_by_key.get(key) if response is not None: self._refs_by_context[response['context']] += 1 - return response - - waiters = self._waiters_by_key.get(key) - if waiters is not None: - waiters.append(msg) - return self.NO_REPLY + latch.put(response) + return latch - self._waiters_by_key[key] = [msg] + latches = self._latches_by_key.setdefault(key, []) + first = len(latches) == 0 + latches.append(latch) finally: self._lock.release() - # I'm the first thread to wait, so I will create the connection. - try: - response = self._connect(key, **kwargs) - count = self._produce_response(key, response) - if response['msg'] is None: + if first: + # I'm the first requestee, so I will create the connection. + try: + response = self._connect(key, spec, via=via) + count = self._produce_response(key, response) # Only record the response for non-error results. self._response_by_key[key] = response # Set the reference count to the number of waiters. self._refs_by_context[response['context']] += count - except mitogen.core.CallError: - e = sys.exc_info()[1] - self._produce_response(key, e) - except Exception: - e = sys.exc_info()[1] - self._produce_response(key, mitogen.core.CallError(e)) + except Exception: + self._produce_response(key, sys.exc_info()) + + return latch + + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'stack': list + }) + def get(self, msg, stack): + """ + Return a Context referring to an established connection with the given + configuration, establishing new connections as necessary. + + :param list stack: + Connection descriptions. Each element is a dict containing 'method' + and 'kwargs' keys describing the Router method and arguments. + Subsequent elements are proxied via the previous. + + :returns dict: + * context: mitogen.master.Context or None. + * homedir: Context's home directory or None. + * msg: StreamError exception text or None. + * method_name: string failing method name. + """ + via = None + for spec in stack: + try: + result = self._wait_or_start(spec, via=via).get() + if isinstance(result, tuple): # exc_info() + e1, e2, e3 = result + raise e1, e2, e3 + via = result['context'] + except mitogen.core.StreamError as e: + return { + 'context': None, + 'home_dir': None, + 'method_name': spec['method'], + 'msg': str(e), + } - return self.NO_REPLY + return result class FileService(mitogen.service.Service): @@ -420,9 +419,9 @@ class FileService(mitogen.service.Service): #: Time spent by the scheduler thread asleep when it has no more data to #: pump, but while at least one transfer remains active. With #: max_queue_size=1MiB and a sleep of 10ms, maximum throughput on any - #: single stream is 100MiB/sec, which is 5x what SSH can handle on my + #: single stream is 112MiB/sec, which is >5x what SSH can handle on my #: laptop. - sleep_delay_ms = 0.01 + sleep_delay_secs = 0.01 def __init__(self, router): super(FileService, self).__init__(router) @@ -430,7 +429,7 @@ class FileService(mitogen.service.Service): self._size_by_path = {} #: Queue used to communicate from service to scheduler thread. self._queue = mitogen.core.Latch() - #: Mapping of Stream->[(sender, fp)]. + #: Mapping of Stream->[(Sender, file object)]. self._pending_by_stream = {} self._thread = threading.Thread(target=self._scheduler_main) self._thread.start() @@ -488,9 +487,9 @@ class FileService(mitogen.service.Service): def _sleep_on_queue(self): """ - Sleep indefinitely (no active transfers) or for :attr:`sleep_delay_ms` - (active transfers) waiting for a new transfer request to arrive from - the :meth:`fetch` method. + Sleep indefinitely (no active transfers) or for + :attr:`sleep_delay_secs` (active transfers) waiting for a new transfer + request to arrive from the :meth:`fetch` method. If a new request arrives, add it to the appropriate list in :attr:`_pending_by_stream`. @@ -500,8 +499,8 @@ class FileService(mitogen.service.Service): :meth:`on_shutdown` hasn't been called yet, otherwise :data:`False`. """ - if self._schedule_pending: - timeout = self.sleep_delay_ms + if self._pending_by_stream: + timeout = self.sleep_delay_secs else: timeout = None @@ -523,7 +522,7 @@ class FileService(mitogen.service.Service): """ Scheduler thread's main function. Sleep until :meth:`_sleep_on_queue` indicates the queue has been shut down, - pending pending file chunks each time we wake. + pumping pending file chunks each time we wake. """ while self._sleep_on_queue(): for stream, pending in list(self._pending_by_stream.items()): diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 430ea74b..ef6a37ac 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -69,8 +69,7 @@ def wrap_connection_loader__get(name, play_context, new_stdin, **kwargs): an argument, so that it can emulate the original type. """ if name in ('ssh', 'local', 'docker'): - kwargs['original_transport'] = name - name = 'mitogen' + name = 'mitogen_' + name return connection_loader__get(name, play_context, new_stdin, **kwargs) diff --git a/docs/ansible.rst b/docs/ansible.rst index f1b6a6e1..6ac03ca0 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -1,64 +1,57 @@ -Ansible Extension -================= - .. image:: images/ansible/cell_division.png :align: right -An extension to `Ansible`_ is included that implements host connections over -Mitogen, replacing embedded shell invocations with pure-Python equivalents -invoked via highly efficient remote procedure calls tunnelled over SSH. No -changes are required to the target hosts. +Mitogen for Ansible +=================== -The extension is approaching a generally dependable state, and works well for -many real-world playbooks. `Bug reports`_ in this area are very welcome – -Ansible is a huge beast, and only significant testing will prove the -extension's soundness. -Divergence from Ansible's normal behaviour is considered a bug, so please -report anything you notice, regardless of how inconsequential it may seem. +An extension to `Ansible`_ is included that implements connections over +Mitogen, replacing embedded shell invocations with pure-Python equivalents +invoked via highly efficient remote procedure calls to persistent interpreters +tunnelled over SSH. No changes are required to target hosts. + +The extension is approaching stability and real-world testing is now +encouraged. `Bug reports`_ are welcome: Ansible is huge, and only wide testing +will ensure soundness. .. _Ansible: https://www.ansible.com/ .. _Bug reports: https://goo.gl/yLKZiJ - Overview -------- -You should **expect a 1.25x - 7x speedup** and a **CPU usage reduction of at -least 2x**, depending on network conditions, the specific modules executed, and -time spent by the target host already doing useful work. Mitogen cannot speed -up a module once it is executing, it can only ensure the module executes as -quickly as possible. - -* **A single SSH connection is used for each target host**, in addition to one - sudo invocation per distinct user account. Subsequent playbook steps always - reuse the same connection. This is much better than SSH multiplexing combined - with pipelining, as significant state can be maintained in RAM between steps, - and the system logs aren't filled with spam from repeat SSH and sudo - invocations. - -* **A single Python interpreter is used** per host and sudo account combination - for the duration of the run, avoiding the repeat cost of invoking multiple - interpreters and recompiling imports, saving 300-800 ms for every playbook - step. - -* Remote interpreters reuse Mitogen's module import mechanism, caching uploaded - dependencies between steps at the host and user account level. As a - consequence, **bandwidth usage is consistently an order of magnitude lower** - compared to SSH pipelining, and around 5x fewer frames are required to - traverse the wire for a run to complete successfully. - -* **No writes to the target host's filesystem occur**, unless explicitly - triggered by a playbook step. In all typical configurations, Ansible - repeatedly rewrites and extracts ZIP files to multiple temporary directories - on the target host. Since no temporary files are used, security issues - relating to those files in cross-account scenarios are entirely avoided. +**Expect a 1.25x - 7x speedup** and a **CPU usage reduction of at least 2x**, +depending on network conditions, modules executed, and time already spent by +targets on useful work. Mitogen cannot improve a module once it is executing, +it can only ensure the module executes as quickly as possible. + +* **One connection is used per target**, in addition to one sudo invocation per + user account. This is much better than SSH multiplexing combined with + pipelining, as significant state can be maintained in RAM between steps, and + system logs aren't spammed with repeat authentication events. + +* **A single network roundtrip is used** to execute a step whose code already + exists in RAM on the target. Eliminating multiplexed SSH channel creation + saves 5 ms runtime per 1 ms of network latency for every playbook step. + +* **Processes are aggressively reused**, avoiding the cost of invoking Python + and recompiling imports, saving 300-800 ms for every playbook step. + +* Code is ephemerally cached in RAM, **reducing bandwidth usage by an order + of magnitude** compared to SSH pipelining, with around 5x fewer frames + traversing the network in a typical run. + +* **No writes to the target's filesystem occur**, unless explicitly triggered + by a playbook step. In all typical configurations, Ansible repeatedly + rewrites and extracts ZIP files to multiple temporary directories on the + target. Since no temporary files are used, security issues relating to those + files in cross-account scenarios are entirely avoided. Demo ----- +~~~~ This demonstrates Ansible running a subset of the Mitogen integration tests concurrent to an equivalent run using the extension. @@ -71,7 +64,7 @@ concurrent to an equivalent run using the extension. Testimonials ------------- +~~~~~~~~~~~~ * "With mitogen **my playbook runtime went from 45 minutes to just under 3 minutes**. Awesome work!" @@ -96,14 +89,11 @@ Testimonials Installation ------------ -.. caution:: - - Please review the behavioural differences documented below prior to use. - -1. Verify Ansible 2.4 and Python 2.7 are listed in the output of ``ansible - --version`` -2. Download and extract https://github.com/dw/mitogen/archive/master.zip -3. Modify ``ansible.cfg``: +1. Thoroughly review the documented behavioural differences. +2. Verify Ansible 2.4/2.5 and Python 2.7 are listed in ``ansible --version`` + output. +3. Download and extract https://github.com/dw/mitogen/archive/master.zip +4. Modify ``ansible.cfg``: .. code-block:: dosini @@ -111,70 +101,257 @@ Installation strategy_plugins = /path/to/mitogen-master/ansible_mitogen/plugins/strategy strategy = mitogen_linear - The ``strategy`` key is optional. If omitted, you can set the - ``ANSIBLE_STRATEGY=mitogen_linear`` environment variable on a per-run basis. - Like ``mitogen_linear``, the ``mitogen_free`` strategy also exists to mimic - the built-in ``free`` strategy. + The ``strategy`` key is optional. If omitted, the + ``ANSIBLE_STRATEGY=mitogen_linear`` environment variable can be set on a + per-run basis. Like ``mitogen_linear``, the ``mitogen_free`` strategy exists + to mimic the ``free`` strategy. + + +Noteworthy Differences +---------------------- + +* Ansible 2.4 and 2.5 are supported. File bugs to register interest in older + releases. -4. Cross your fingers and try it. +* The ``sudo`` become method is available and ``su`` is planned. File bugs to + register interest in additional methods. +* The ``ssh``, ``local`` and ``docker`` connection types are available, with + more planned. File bugs to register interest. -Limitations ------------ +* Local commands execute in a reuseable interpreter created identically to + interpreters on targets. Presently one interpreter per ``become_user`` + exists, and so only one local action may execute simultaneously. -* Only Ansible 2.4 is being used for development, with occasional tests under - 2.5, 2.3 and 2.2. It should be more than possible to fully support at least - 2.3, if not also 2.2. + Ansible usually permits up to ``forks`` simultaneous local actions. Any + long-running local actions that execute for every target will experience + artificial serialization, causing slowdown equivalent to `task_duration * + num_targets`. This will be fixed soon. -* Only the ``sudo`` become method is available, however adding new methods is - straightforward, and eventually at least ``su`` will be included. +* Asynchronous jobs presently exist only for the duration of a run, and time + limits are not implemented. -* The extension's performance benefits do not scale perfectly linearly with the - number of targets. This is a subject of ongoing investigation and - improvements will appear in time. +* Due to use of :func:`select.select` the IO multiplexer breaks down around 100 + targets, expect performance degradation as this number is approached and + errant behaviour as it is exceeded. A replacement will appear soon. -* "Module Replacer" style modules are not yet supported. These rarely appear in - practice, and light Github code searches failed to reveal many examples of - them. +* The undocumented ability to extend :mod:`ansible.module_utils` by supplying a + ``module_utils`` directory alongside a custom new-style module is not yet + supported. +* "Module Replacer" style modules are not supported. These rarely appear in + practice, and light web searches failed to reveal many examples of them. -Behavioural Differences ------------------------ +* Ansible permits up to ``forks`` connections to be setup in parallel, whereas + in Mitogen this is handled by a fixed-size thread pool. Up to 16 connections + may be established in parallel by default, this can be modified by setting + the ``MITOGEN_POOL_SIZE`` environment variable. -* Ansible permits up to ``forks`` SSH connections to be setup simultaneously, - whereas in Mitogen this is handled by a thread pool. Eventually this pool - will become per-CPU, but meanwhile, a maximum of 16 SSH connections may be - established simultaneously by default. This can be increased or decreased - setting the ``MITOGEN_POOL_SIZE`` environment variable. +* Performance does not scale perfectly linearly with target count. This will + improve over time. -* Mitogen treats connection timeouts for the SSH and become steps of a task - invocation separately, meaning that in some circumstances the configured - timeout may appear to be doubled. This is since Mitogen internally treats the - creation of an SSH account context separately to the creation of a sudo - account context proxied via that SSH account. +* Timeouts normally apply to the combined runtime of the SSH and become steps + of a task. As Mitogen treats SSH and sudo distincly, during a failure the + effective timeout may appear to double. - A future revision may detect a sudo account context created immediately - following its parent SSH account, and try to emulate Ansible's existing - timeout semantics. -* Local commands are executed in a reuseable Python interpreter created - identically to interpreters used on remote hosts. At present only one such - interpreter per ``become_user`` exists, and so only one local action may be - executed simultaneously per local user account. +New Features & Notes +-------------------- + + +Connection Delegation +~~~~~~~~~~~~~~~~~~~~~ + +.. image:: images/jumpbox.png + :align: right + +Included is a preview of **Connection Delegation**, a Mitogen-specific +implementation of `stackable connection plug-ins`_. This enables multi-hop +connections via a bastion, or Docker connections delegated via their host +machine, where reaching the host may itself entail recursive delegation. + +.. _Stackable connection plug-ins: https://github.com/ansible/proposals/issues/25 + +Unlike with SSH forwarding Ansible has complete visibility of the final +topology, declarative configuration via static/dynamic inventory is possible, +and data can be cached and re-served, and code executed on every intermediary. + +For example when targeting Docker containers on a remote machine, each module +need only be uploaded once for the first task and container that requires it, +then cached and served from the SSH account for every future task in any +container. + +.. raw:: html + +
+ + +.. caution:: + + Connection delegation is a work in progress, bug reports are welcome. - Ansible usually permits up to ``ansible.cfg:forks`` simultaneous local - actions. Any long-running local actions that execute for every target will - experience artificial serialization, causing slowdown equivalent to - `task_duration * num_targets`. This will be fixed soon. + * While imports are cached on intermediaries, module scripts are needlessly + reuploaded for each target. Fixing this is equivalent to implementing + **Topology-Aware File Synchronization**, so it may remain unfixed until + that feature is started. + + * Delegated connection setup is single-threaded; only one connection can be + constructed in parallel per intermediary. + + * Unbounded queue RAM growth may occur in an intermediary during large file + transfers if the link between any two hops is slower than the link + between the controller and the first hop. + + * Inferring the configuration of intermediaries may be buggy, manifesting + as duplicate connections between hops, due to not perfectly replicating + the configuration Ansible would normally use for the intermediary. + + * The extension does not understand the difference between a delegated + connection and a ``become_user``. If interpreter recycling kicks in, a + delegated connection could be prematurely recycled. + +To enable connection delegation, set ``mitogen_via=