From 3fab8a3af5724d75054134d8fac64dc848bfa529 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Tue, 24 Apr 2018 22:42:02 +0100 Subject: [PATCH] ansible: connection delegation v1 This implements the first edition of Connection Delegation, where delegating connection establishment is initially single-threaded. ansible_mitogen/strategy.py: ansible_mitogen/plugins/connection/*: Begin splitting connection.Connection into subclasses, exposing them directly as "mitogen_ssh", "mitogen_local", etc. connection types. This is far from removing strategy.py, but it's a tiny start. ansible_mitogen/connection.py: * config_from_play_context() and config_from_host_vars() build up a huge dictionary containing either more or less PlayContext contents, or our best attempt at reconstructing a host's connection config from its hostvars, where that config is not the current WorkerProcess target. They both produce the same format with the same keys, allowing remaining code to have a single input format. These dicts contain fields named after how Ansible refers to them, e.g. "sudo_exe". * _config_from_via() parses a basic connection specification like "username@inventory_name" into one of the aforementioned dicts. * _stack_from_config() produces a list of dicts describing the order in which (Mitogen) connections should be established, such that each element is proxied via= the previous element. The dicts produced by this function use Mitogen keyword arguments, the former di. These dicts contain fields named after how Mitogen refers to them, e.g. "sudo_path". * Pass the stack to ContextService, which is responsible for actual setup of the full chain. ansible_mitogen/services.py: Teach get() to walk the supplied stack, establishing each connection in turn, creating refounts for it before continuing. TODO: refcounting is broken in a variety of cases. --- ansible_mitogen/connection.py | 394 +++++++++++------- .../plugins/connection/mitogen_docker.py | 56 +++ .../plugins/connection/mitogen_local.py | 56 +++ .../connection/{mitogen.py => mitogen_ssh.py} | 2 +- ansible_mitogen/services.py | 151 ++++--- ansible_mitogen/strategy.py | 3 +- 6 files changed, 425 insertions(+), 237 deletions(-) create mode 100644 ansible_mitogen/plugins/connection/mitogen_docker.py create mode 100644 ansible_mitogen/plugins/connection/mitogen_local.py rename ansible_mitogen/plugins/connection/{mitogen.py => mitogen_ssh.py} (97%) diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 69dec732..1554cf9d 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -33,6 +33,8 @@ import shlex import sys import time +import jinja2.runtime +import ansible.constants as C import ansible.errors import ansible.plugins.connection @@ -47,6 +49,135 @@ import ansible_mitogen.services LOG = logging.getLogger(__name__) +def _connect_local(spec): + return { + 'method': 'local', + 'kwargs': { + 'python_path': spec['python_path'], + } + } + + +def _connect_ssh(spec): + return { + 'method': 'ssh', + 'kwargs': { + 'check_host_keys': False, # TODO + 'hostname': spec['remote_addr'], + 'username': spec['remote_user'], + 'password': spec['password'], + 'port': spec['port'], + 'python_path': spec['python_path'], + 'identity_file': spec['private_key_file'], + 'ssh_path': spec['ssh_executable'], + 'connect_timeout': spec['ansible_ssh_timeout'], + 'ssh_args': spec['ssh_args'], + } + } + + +def _connect_docker(spec): + return { + 'method': 'docker', + 'kwargs': { + 'username': spec['remote_user'], + 'container': spec['remote_addr'], + 'python_path': spec['python_path'], + 'connect_timeout': spec['ansible_ssh_timeout'] or spec['timeout'], + } + } + + +def _connect_sudo(spec): + return { + 'method': 'sudo', + 'kwargs': { + 'username': spec['become_user'], + 'password': spec['become_pass'], + 'python_path': spec['python_path'], + 'sudo_path': spec['sudo_exe'], + 'connect_timeout': spec['timeout'], + 'sudo_args': spec['sudo_args'], + } + } + + +CONNECTION_METHOD = { + 'sudo': _connect_sudo, + 'ssh': _connect_ssh, + 'local': _connect_local, + 'docker': _connect_docker, +} + + +def config_from_play_context(transport, inventory_name, connection): + """ + Return a dict representing all important connection configuration, allowing + the same functions to work regardless of whether configuration came from + play_context (direct connection) or host vars (mitogen_via=). + """ + return { + 'transport': transport, + 'inventory_name': inventory_name, + 'remote_addr': connection._play_context.remote_addr, + 'remote_user': connection._play_context.remote_user, + 'become': connection._play_context.become, + 'become_method': connection._play_context.become_method, + 'become_user': connection._play_context.become_user, + 'become_pass': connection._play_context.become_pass, + 'password': connection._play_context.password, + 'port': connection._play_context.port, + 'python_path': connection.python_path, + 'private_key_file': connection._play_context.private_key_file, + 'ssh_executable': connection._play_context.ssh_executable, + 'timeout': connection._play_context.timeout, + 'ansible_ssh_timeout': connection.ansible_ssh_timeout, + 'ssh_args': [ + term + for s in ( + getattr(connection._play_context, 'ssh_args', ''), + getattr(connection._play_context, 'ssh_common_args', ''), + getattr(connection._play_context, 'ssh_extra_args', '') + ) + for term in shlex.split(s or '') + ], + 'sudo_exe': connection._play_context.sudo_exe, + 'sudo_args': [ + term + for s in ( + connection._play_context.sudo_flags, + connection._play_context.become_flags + ) + for term in shlex.split(s or '') + ], + 'mitogen_via': connection.mitogen_via, + } + + +def config_from_hostvars(transport, inventory_name, connection, + hostvars, become_user): + """ + Override config_from_play_context() to take equivalent information from + host vars. + """ + config = config_from_play_context(transport, inventory_name, connection) + hostvars = dict(hostvars) + return dict(config, **{ + 'remote_addr': hostvars.get('ansible_hostname', inventory_name), + 'become': bool(become_user), + 'become_user': become_user, + 'become_pass': None, + 'remote_user': hostvars.get('ansible_user'), # TODO + 'password': (hostvars.get('ansible_ssh_pass') or + hostvars.get('ansible_password')), + 'port': hostvars.get('ansible_port'), + 'python_path': hostvars.get('ansible_python_interpreter'), + 'private_key_file': (hostvars.get('ansible_ssh_private_key_file') or + hostvars.get('ansible_private_key_file')), + 'mitogen_via': hostvars.get('mitogen_via'), + }) + + class Connection(ansible.plugins.connection.ConnectionBase): #: mitogen.master.Broker for this worker. broker = None @@ -58,45 +189,39 @@ class Connection(ansible.plugins.connection.ConnectionBase): #: presently always the master process. parent = None - #: mitogen.master.Context connected to the target machine's initial SSH - #: account. - host = None - #: mitogen.master.Context connected to the target user account on the - #: target machine (i.e. via sudo), or simply a copy of :attr:`host` if - #: become is not in use. + #: target machine (i.e. via sudo). context = None #: Only sudo is supported for now. become_methods = ['sudo'] - #: Set by the constructor according to whichever connection type this - #: connection should emulate. We emulate the original connection type to - #: work around artificial limitations in e.g. the synchronize action, which - #: hard-codes 'local' and 'ssh' as the only allowable connection types. - transport = None - #: Set to 'ansible_python_interpreter' by on_action_run(). python_path = None #: Set to 'ansible_sudo_exe' by on_action_run(). - sudo_path = None + sudo_exe = None #: Set to 'ansible_ssh_timeout' by on_action_run(). ansible_ssh_timeout = None + #: Set to 'mitogen_via' by on_action_run(). + mitogen_via = None + + #: Set to 'inventory_hostname' by on_action_run(). + inventory_hostname = None + + #: Set to 'hostvars' by on_action_run() + host_vars = None + #: Set after connection to the target context's home directory. _homedir = None - def __init__(self, play_context, new_stdin, original_transport, **kwargs): + def __init__(self, play_context, new_stdin, **kwargs): assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( - 'The "mitogen" connection plug-in may only be instantiated ' - 'by the "mitogen" strategy plug-in.' + 'Mitogen connection types may only be instantiated ' + 'while the "mitogen" strategy is active.' ) - - self.original_transport = original_transport - self.transport = original_transport - self.kwargs = kwargs super(Connection, self).__init__(play_context, new_stdin) def __del__(self): @@ -114,19 +239,13 @@ class Connection(ansible.plugins.connection.ConnectionBase): executing. We use the opportunity to grab relevant bits from the task-specific data. """ - self.ansible_ssh_timeout = task_vars.get( - 'ansible_ssh_timeout', - None - ) - self.python_path = task_vars.get( - 'ansible_python_interpreter', - '/usr/bin/python' - ) - self.sudo_path = task_vars.get( - 'ansible_sudo_exe', - 'sudo' - ) - + self.ansible_ssh_timeout = task_vars.get('ansible_ssh_timeout') + self.python_path = task_vars.get('ansible_python_interpreter', + '/usr/bin/python') + self.sudo_exe = task_vars.get('ansible_sudo_exe', C.DEFAULT_SUDO_EXE) + self.mitogen_via = task_vars.get('mitogen_via') + self.inventory_hostname = task_vars['inventory_hostname'] + self.host_vars = task_vars['hostvars'] self.close(new_task=True) @property @@ -138,109 +257,52 @@ class Connection(ansible.plugins.connection.ConnectionBase): def connected(self): return self.context is not None - def _on_connection_error(self, msg): - raise ansible.errors.AnsibleConnectionFailure(msg) - - def _on_become_error(self, msg): - # TODO: vanilla become failures yield this: - # { - # "changed": false, - # "module_stderr": "sudo: sorry, you must have a tty to run sudo\n", - # "module_stdout": "", - # "msg": "MODULE FAILURE", - # "rc": 1 - # } - # - # Currently we yield this: - # { - # "msg": "EOF on stream; last 300 bytes received: 'sudo: ....\n'" - # } - raise ansible.errors.AnsibleModuleError(msg) - - def _wrap_connect(self, on_error, kwargs): - dct = mitogen.service.call( - context=self.parent, - handle=ansible_mitogen.services.ContextService.handle, - method='get', - kwargs=mitogen.utils.cast(kwargs), - ) + def _config_from_via(self, via_spec): + become_user, _, inventory_name = via_spec.rpartition('@') + via_vars = self.host_vars[inventory_name] + if isinstance(via_vars, jinja2.runtime.Undefined): + raise ansible.errors.AnsibleConnectionFailure( + self.unknown_via_msg % ( + self.mitogen_via, + config['inventory_name'], + ) + ) - if dct['msg']: - on_error(dct['msg']) + return config_from_hostvars( + transport=via_vars.get('ansible_connection', 'ssh'), + inventory_name=inventory_name, + connection=self, + hostvars=via_vars, + become_user=become_user or None, + ) - return dct['context'], dct['home_dir'] + unknown_via_msg = 'mitogen_via=%s of %s specifies an unknown hostname' + via_cycle_msg = 'mitogen_via=%s of %s creates a cycle (%s)' + + def _stack_from_config(self, config, stack=(), seen_names=()): + if config['inventory_name'] in seen_names: + raise ansible.errors.AnsibleConnectionFailure( + self.via_cycle_msg % ( + config['mitogen_via'], + config['inventory_name'], + ' -> '.join(reversed( + seen_names + (config['inventory_name'],) + )), + ) + ) - def _connect_local(self): - """ - Fetch a reference to the local() Context from ContextService in the - master process. - """ - return self._wrap_connect(self._on_connection_error, { - 'method_name': 'local', - 'python_path': self.python_path, - }) + if config['mitogen_via']: + stack, seen_names = self._stack_from_config( + self._config_from_via(config['mitogen_via']), + stack=stack, + seen_names=seen_names + (config['inventory_name'],) + ) - def _connect_ssh(self): - """ - Fetch a reference to an SSH Context matching the play context from - ContextService in the master process. - """ - return self._wrap_connect(self._on_connection_error, { - 'method_name': 'ssh', - 'check_host_keys': False, # TODO - 'hostname': self._play_context.remote_addr, - 'username': self._play_context.remote_user, - 'password': self._play_context.password, - 'port': self._play_context.port, - 'python_path': self.python_path, - 'identity_file': self._play_context.private_key_file, - 'ssh_path': self._play_context.ssh_executable, - 'connect_timeout': self.ansible_ssh_timeout, - 'ssh_args': [ - term - for s in ( - getattr(self._play_context, 'ssh_args', ''), - getattr(self._play_context, 'ssh_common_args', ''), - getattr(self._play_context, 'ssh_extra_args', '') - ) - for term in shlex.split(s or '') - ] - }) - - def _connect_docker(self): - return self._wrap_connect(self._on_connection_error, { - 'method_name': 'docker', - 'container': self._play_context.remote_addr, - 'python_path': self.python_path, - 'connect_timeout': self._play_context.timeout, - }) - - def _connect_sudo(self, via=None, python_path=None): - """ - Fetch a reference to a sudo Context matching the play context from - ContextService in the master process. + stack += (CONNECTION_METHOD[config['transport']](config),) + if config['become']: + stack += (CONNECTION_METHOD[config['become_method']](config),) - :param via: - Parent Context of the sudo Context. For Ansible, this should always - be a Context returned by _connect_ssh(). - """ - return self._wrap_connect(self._on_become_error, { - 'method_name': 'sudo', - 'username': self._play_context.become_user, - 'password': self._play_context.become_pass, - 'python_path': python_path or self.python_path, - 'sudo_path': self.sudo_path, - 'connect_timeout': self._play_context.timeout, - 'via': via, - 'sudo_args': [ - term - for s in ( - self._play_context.sudo_flags, - self._play_context.become_flags - ) - for term in shlex.split(s or '') - ], - }) + return stack, seen_names def _connect(self): """ @@ -263,24 +325,30 @@ class Connection(ansible.plugins.connection.ConnectionBase): broker=self.broker, ) - if self.original_transport == 'local': - if self._play_context.become: - self.context, self._homedir = self._connect_sudo( - python_path=sys.executable - ) - else: - self.context, self._homedir = self._connect_local() - return + stack, _ = self._stack_from_config( + config_from_play_context( + transport=self.transport, + inventory_name=self.inventory_hostname, + connection=self + ) + ) + + dct = mitogen.service.call( + context=self.parent, + handle=ansible_mitogen.services.ContextService.handle, + method='get', + kwargs=mitogen.utils.cast({ + 'stack': stack, + }) + ) - if self.original_transport == 'docker': - self.host, self._homedir = self._connect_docker() - elif self.original_transport == 'ssh': - self.host, self._homedir = self._connect_ssh() + if dct['msg']: + if dct['method_name'] in self.become_methods: + raise ansible.errors.AnsibleModuleError(dct['msg']) + raise ansible.errors.AnsibleConnectionFailure(dct['msg']) - if self._play_context.become: - self.context, self._homedir = self._connect_sudo(via=self.host) - else: - self.context = self.host + self.context = dct['context'] + self._homedir = dct['home_dir'] def get_context_name(self): """ @@ -296,18 +364,16 @@ class Connection(ansible.plugins.connection.ConnectionBase): gracefully shut down, and wait for shutdown to complete. Safe to call multiple times. """ - for context in set([self.host, self.context]): - if context: - mitogen.service.call( - context=self.parent, - handle=ansible_mitogen.services.ContextService.handle, - method='put', - kwargs={ - 'context': context - } - ) + if self.context: + mitogen.service.call( + context=self.parent, + handle=ansible_mitogen.services.ContextService.handle, + method='put', + kwargs={ + 'context': self.context + } + ) - self.host = None self.context = None if self.broker and not new_task: self.broker.shutdown() @@ -420,3 +486,15 @@ class Connection(ansible.plugins.connection.ConnectionBase): in_path=in_path, out_path=out_path ) + + +class SshConnection(Connection): + transport = 'ssh' + + +class LocalConnection(Connection): + transport = 'local' + + +class DockerConnection(Connection): + transport = 'docker' diff --git a/ansible_mitogen/plugins/connection/mitogen_docker.py b/ansible_mitogen/plugins/connection/mitogen_docker.py new file mode 100644 index 00000000..70a90147 --- /dev/null +++ b/ansible_mitogen/plugins/connection/mitogen_docker.py @@ -0,0 +1,56 @@ +# Copyright 2017, David Wilson +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os.path +import sys + +# +# This is not the real Connection implementation module, it simply exists as a +# proxy to the real module, which is loaded using Python's regular import +# mechanism, to prevent Ansible's PluginLoader from making up a fake name that +# results in ansible_mitogen plugin modules being loaded twice: once by +# PluginLoader with a name like "ansible.plugins.connection.mitogen", which is +# stuffed into sys.modules even though attempting to import it will trigger an +# ImportError, and once under its canonical name, "ansible_mitogen.connection". +# +# Therefore we have a proxy module that imports it under the real name, and +# sets up the duff PluginLoader-imported module to just contain objects from +# the real module, so duplicate types don't exist in memory, and things like +# debuggers and isinstance() work predictably. +# + +try: + import ansible_mitogen +except ImportError: + base_dir = os.path.dirname(__file__) + sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) + del base_dir + +from ansible_mitogen.connection import DockerConnection as Connection +del os +del sys diff --git a/ansible_mitogen/plugins/connection/mitogen_local.py b/ansible_mitogen/plugins/connection/mitogen_local.py new file mode 100644 index 00000000..fc1a8565 --- /dev/null +++ b/ansible_mitogen/plugins/connection/mitogen_local.py @@ -0,0 +1,56 @@ +# Copyright 2017, David Wilson +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors +# may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os.path +import sys + +# +# This is not the real Connection implementation module, it simply exists as a +# proxy to the real module, which is loaded using Python's regular import +# mechanism, to prevent Ansible's PluginLoader from making up a fake name that +# results in ansible_mitogen plugin modules being loaded twice: once by +# PluginLoader with a name like "ansible.plugins.connection.mitogen", which is +# stuffed into sys.modules even though attempting to import it will trigger an +# ImportError, and once under its canonical name, "ansible_mitogen.connection". +# +# Therefore we have a proxy module that imports it under the real name, and +# sets up the duff PluginLoader-imported module to just contain objects from +# the real module, so duplicate types don't exist in memory, and things like +# debuggers and isinstance() work predictably. +# + +try: + import ansible_mitogen +except ImportError: + base_dir = os.path.dirname(__file__) + sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) + del base_dir + +from ansible_mitogen.connection import LocalConnection as Connection +del os +del sys diff --git a/ansible_mitogen/plugins/connection/mitogen.py b/ansible_mitogen/plugins/connection/mitogen_ssh.py similarity index 97% rename from ansible_mitogen/plugins/connection/mitogen.py rename to ansible_mitogen/plugins/connection/mitogen_ssh.py index 5ae51ab1..e0a30672 100644 --- a/ansible_mitogen/plugins/connection/mitogen.py +++ b/ansible_mitogen/plugins/connection/mitogen_ssh.py @@ -51,6 +51,6 @@ except ImportError: sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) del base_dir -from ansible_mitogen.connection import Connection +from ansible_mitogen.connection import SshConnection as Connection del os del sys diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 79c8de46..ae6e8ed7 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -82,9 +82,9 @@ class ContextService(mitogen.service.Service): #: Records the :meth:`get` result dict for successful calls, returned #: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`. self._response_by_key = {} - #: List of :class:`mitogen.core.Message` waiting for the result dict - #: for a particular connection config. Keyed as sbove. - self._waiters_by_key = {} + #: List of :class:`mitogen.core.Latch` awaiting the result for a + #: particular key. + self._latches_by_key = {} #: Mapping of :class:`mitogen.core.Context` -> reference count. Each #: call to :meth:`get` increases this by one. Calls to :meth:`put` #: decrease it by one. @@ -134,10 +134,10 @@ class ContextService(mitogen.service.Service): """ self._lock.acquire() try: - waiters = self._waiters_by_key.pop(key) - count = len(waiters) - for msg in waiters: - msg.reply(response) + latches = self._latches_by_key.pop(key) + count = len(latches) + for latch in latches: + latch.put(response) finally: self._lock.release() return count @@ -164,17 +164,12 @@ class ContextService(mitogen.service.Service): finally: self._lock.release() - def _update_lru(self, new_context, **kwargs): + def _update_lru(self, new_context, spec, via): """ Update the LRU ("MRU"?) list associated with the connection described by `kwargs`, destroying the most recently created context if the list is full. Finally add `new_context` to the list. """ - via = kwargs.get('via') - if via is None: - # We don't have a limit on the number of directly connections. - return - lru = self._lru_by_via.setdefault(via, []) if len(lru) < self.max_interpreters: lru.append(new_context) @@ -207,7 +202,7 @@ class ContextService(mitogen.service.Service): """ # TODO: there is a race between creation of a context and disconnection # of its related stream. An error reply should be sent to any message - # in _waiters_by_key below. + # in _latches_by_key below. self._lock.acquire() try: for context, key in list(self._key_by_context.items()): @@ -215,14 +210,14 @@ class ContextService(mitogen.service.Service): LOG.info('Dropping %r due to disconnect of %r', context, stream) self._response_by_key.pop(key, None) - self._waiters_by_key.pop(key, None) + self._latches_by_key.pop(key, None) self._refs_by_context.pop(context, None) self._lru_by_via.pop(context, None) self._refs_by_context.pop(context, None) finally: self._lock.release() - def _connect(self, key, method_name, **kwargs): + def _connect(self, key, spec, via=None): """ Actual connect implementation. Arranges for the Mitogen connection to be created and enqueues an asynchronous call to start the forked task @@ -230,11 +225,8 @@ class ContextService(mitogen.service.Service): :param key: Deduplication key representing the connection configuration. - :param method_name: - :class:`mitogen.parent.Router` method implementing the connection - type. - :param kwargs: - Keyword arguments passed to the router method. + :param spec: + Connection specification. :returns: Dict like:: @@ -248,21 +240,14 @@ class ContextService(mitogen.service.Service): :data:`None`, or `msg` is :data:`None` and the remaining fields are set. """ - method = getattr(self.router, method_name, None) - if method is None: - raise Error('no such Router method: %s' % (method_name,)) - try: - context = method(**kwargs) - except mitogen.core.StreamError as e: - return { - 'context': None, - 'home_dir': None, - 'msg': str(e), - } - - if kwargs.get('via'): - self._update_lru(context, method_name=method_name, **kwargs) + method = getattr(self.router, spec['method']) + except AttributeError: + raise Error('unsupported method: %(transport)s' % spec) + + context = method(via=via, **spec['kwargs']) + if via: + self._update_lru(context, spec, via) else: # For directly connected contexts, listen to the associated # Stream's disconnect event and use it to invalidate dependent @@ -289,60 +274,74 @@ class ContextService(mitogen.service.Service): 'msg': None, } - @mitogen.service.expose(mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'method_name': str - }) - def get(self, msg, **kwargs): - """ - Return a Context referring to an established connection with the given - configuration, establishing a new connection as necessary. - - :param str method_name: - The :class:`mitogen.parent.Router` connection method to use. - :param dict kwargs: - Keyword arguments passed to `mitogen.master.Router.[method_name]()`. - - :returns tuple: - Tuple of `(context, home_dir)`, where: - * `context` is the mitogen.master.Context referring to the - target context. - * `home_dir` is a cached copy of the remote directory. - """ - key = self.key_from_kwargs(**kwargs) + def _wait_or_start(self, spec, via=None): + latch = mitogen.core.Latch() + key = self.key_from_kwargs(via=via, **spec) self._lock.acquire() try: response = self._response_by_key.get(key) if response is not None: self._refs_by_context[response['context']] += 1 - return response - - waiters = self._waiters_by_key.get(key) - if waiters is not None: - waiters.append(msg) - return self.NO_REPLY + latch.put(response) + return latch - self._waiters_by_key[key] = [msg] + latches = self._latches_by_key.setdefault(key, []) + first = len(latches) == 0 + latches.append(latch) finally: self._lock.release() - # I'm the first thread to wait, so I will create the connection. - try: - response = self._connect(key, **kwargs) - count = self._produce_response(key, response) - if response['msg'] is None: + if first: + # I'm the first requestee, so I will create the connection. + try: + response = self._connect(key, spec, via=via) + count = self._produce_response(key, response) # Only record the response for non-error results. self._response_by_key[key] = response # Set the reference count to the number of waiters. self._refs_by_context[response['context']] += count - except mitogen.core.CallError: - e = sys.exc_info()[1] - self._produce_response(key, e) - except Exception: - e = sys.exc_info()[1] - self._produce_response(key, mitogen.core.CallError(e)) - - return self.NO_REPLY + except Exception: + self._produce_response(key, sys.exc_info()) + + return latch + + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'stack': list + }) + def get(self, msg, stack): + """ + Return a Context referring to an established connection with the given + configuration, establishing new connections as necessary. + + :param list stack: + Connection descriptions. Each element is a dict containing 'method' + and 'kwargs' keys describing the Router method and arguments. + Subsequent elements are proxied via the previous. + + :returns dict: + * context: mitogen.master.Context or None. + * homedir: Context's home directory or None. + * msg: StreamError exception text or None. + * method_name: string failing method name. + """ + via = None + for spec in stack: + try: + result = self._wait_or_start(spec, via=via).get() + if isinstance(result, tuple): # exc_info() + e1, e2, e3 = result + raise e1, e2, e3 + via = result['context'] + except mitogen.core.StreamError as e: + return { + 'context': None, + 'home_dir': None, + 'method_name': spec['method'], + 'msg': str(e), + } + + return result class FileService(mitogen.service.Service): diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index 430ea74b..ef6a37ac 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -69,8 +69,7 @@ def wrap_connection_loader__get(name, play_context, new_stdin, **kwargs): an argument, so that it can emulate the original type. """ if name in ('ssh', 'local', 'docker'): - kwargs['original_transport'] = name - name = 'mitogen' + name = 'mitogen_' + name return connection_loader__get(name, play_context, new_stdin, **kwargs)