ansible: connection delegation v1

This implements the first edition of Connection Delegation, where
delegating connection establishment is initially single-threaded.

ansible_mitogen/strategy.py:
ansible_mitogen/plugins/connection/*:

  Begin splitting connection.Connection into subclasses, exposing them
  directly as "mitogen_ssh", "mitogen_local", etc. connection types.

  This is far from removing strategy.py, but it's a tiny start.

ansible_mitogen/connection.py:

  * config_from_play_context() and config_from_host_vars() build up a
    huge dictionary containing either more or less PlayContext contents,
    or our best attempt at reconstructing a host's connection config
    from its hostvars, where that config is not the current
    WorkerProcess target.

    They both produce the same format with the same keys, allowing
    remaining code to have a single input format.

    These dicts contain fields named after how Ansible refers to them,
    e.g. "sudo_exe".

  * _config_from_via() parses a basic connection specification like
    "username@inventory_name" into one of the aforementioned dicts.

  * _stack_from_config() produces a list of dicts describing the order
    in which (Mitogen) connections should be established, such that each
    element is proxied via= the previous element. The dicts produced by
    this function use Mitogen keyword arguments, the former di.

    These dicts contain fields named after how Mitogen refers to them,
    e.g. "sudo_path".

  * Pass the stack to ContextService, which is responsible for actual
    setup of the full chain.

ansible_mitogen/services.py:

  Teach get() to walk the supplied stack, establishing each connection
  in turn, creating refounts for it before continuing.

  TODO: refcounting is broken in a variety of cases.
pull/219/head
David Wilson 7 years ago
parent 3a07ed3d65
commit 3fab8a3af5

@ -33,6 +33,8 @@ import shlex
import sys import sys
import time import time
import jinja2.runtime
import ansible.constants as C
import ansible.errors import ansible.errors
import ansible.plugins.connection import ansible.plugins.connection
@ -47,6 +49,135 @@ import ansible_mitogen.services
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _connect_local(spec):
return {
'method': 'local',
'kwargs': {
'python_path': spec['python_path'],
}
}
def _connect_ssh(spec):
return {
'method': 'ssh',
'kwargs': {
'check_host_keys': False, # TODO
'hostname': spec['remote_addr'],
'username': spec['remote_user'],
'password': spec['password'],
'port': spec['port'],
'python_path': spec['python_path'],
'identity_file': spec['private_key_file'],
'ssh_path': spec['ssh_executable'],
'connect_timeout': spec['ansible_ssh_timeout'],
'ssh_args': spec['ssh_args'],
}
}
def _connect_docker(spec):
return {
'method': 'docker',
'kwargs': {
'username': spec['remote_user'],
'container': spec['remote_addr'],
'python_path': spec['python_path'],
'connect_timeout': spec['ansible_ssh_timeout'] or spec['timeout'],
}
}
def _connect_sudo(spec):
return {
'method': 'sudo',
'kwargs': {
'username': spec['become_user'],
'password': spec['become_pass'],
'python_path': spec['python_path'],
'sudo_path': spec['sudo_exe'],
'connect_timeout': spec['timeout'],
'sudo_args': spec['sudo_args'],
}
}
CONNECTION_METHOD = {
'sudo': _connect_sudo,
'ssh': _connect_ssh,
'local': _connect_local,
'docker': _connect_docker,
}
def config_from_play_context(transport, inventory_name, connection):
"""
Return a dict representing all important connection configuration, allowing
the same functions to work regardless of whether configuration came from
play_context (direct connection) or host vars (mitogen_via=).
"""
return {
'transport': transport,
'inventory_name': inventory_name,
'remote_addr': connection._play_context.remote_addr,
'remote_user': connection._play_context.remote_user,
'become': connection._play_context.become,
'become_method': connection._play_context.become_method,
'become_user': connection._play_context.become_user,
'become_pass': connection._play_context.become_pass,
'password': connection._play_context.password,
'port': connection._play_context.port,
'python_path': connection.python_path,
'private_key_file': connection._play_context.private_key_file,
'ssh_executable': connection._play_context.ssh_executable,
'timeout': connection._play_context.timeout,
'ansible_ssh_timeout': connection.ansible_ssh_timeout,
'ssh_args': [
term
for s in (
getattr(connection._play_context, 'ssh_args', ''),
getattr(connection._play_context, 'ssh_common_args', ''),
getattr(connection._play_context, 'ssh_extra_args', '')
)
for term in shlex.split(s or '')
],
'sudo_exe': connection._play_context.sudo_exe,
'sudo_args': [
term
for s in (
connection._play_context.sudo_flags,
connection._play_context.become_flags
)
for term in shlex.split(s or '')
],
'mitogen_via': connection.mitogen_via,
}
def config_from_hostvars(transport, inventory_name, connection,
hostvars, become_user):
"""
Override config_from_play_context() to take equivalent information from
host vars.
"""
config = config_from_play_context(transport, inventory_name, connection)
hostvars = dict(hostvars)
return dict(config, **{
'remote_addr': hostvars.get('ansible_hostname', inventory_name),
'become': bool(become_user),
'become_user': become_user,
'become_pass': None,
'remote_user': hostvars.get('ansible_user'), # TODO
'password': (hostvars.get('ansible_ssh_pass') or
hostvars.get('ansible_password')),
'port': hostvars.get('ansible_port'),
'python_path': hostvars.get('ansible_python_interpreter'),
'private_key_file': (hostvars.get('ansible_ssh_private_key_file') or
hostvars.get('ansible_private_key_file')),
'mitogen_via': hostvars.get('mitogen_via'),
})
class Connection(ansible.plugins.connection.ConnectionBase): class Connection(ansible.plugins.connection.ConnectionBase):
#: mitogen.master.Broker for this worker. #: mitogen.master.Broker for this worker.
broker = None broker = None
@ -58,45 +189,39 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: presently always the master process. #: presently always the master process.
parent = None parent = None
#: mitogen.master.Context connected to the target machine's initial SSH
#: account.
host = None
#: mitogen.master.Context connected to the target user account on the #: mitogen.master.Context connected to the target user account on the
#: target machine (i.e. via sudo), or simply a copy of :attr:`host` if #: target machine (i.e. via sudo).
#: become is not in use.
context = None context = None
#: Only sudo is supported for now. #: Only sudo is supported for now.
become_methods = ['sudo'] become_methods = ['sudo']
#: Set by the constructor according to whichever connection type this
#: connection should emulate. We emulate the original connection type to
#: work around artificial limitations in e.g. the synchronize action, which
#: hard-codes 'local' and 'ssh' as the only allowable connection types.
transport = None
#: Set to 'ansible_python_interpreter' by on_action_run(). #: Set to 'ansible_python_interpreter' by on_action_run().
python_path = None python_path = None
#: Set to 'ansible_sudo_exe' by on_action_run(). #: Set to 'ansible_sudo_exe' by on_action_run().
sudo_path = None sudo_exe = None
#: Set to 'ansible_ssh_timeout' by on_action_run(). #: Set to 'ansible_ssh_timeout' by on_action_run().
ansible_ssh_timeout = None ansible_ssh_timeout = None
#: Set to 'mitogen_via' by on_action_run().
mitogen_via = None
#: Set to 'inventory_hostname' by on_action_run().
inventory_hostname = None
#: Set to 'hostvars' by on_action_run()
host_vars = None
#: Set after connection to the target context's home directory. #: Set after connection to the target context's home directory.
_homedir = None _homedir = None
def __init__(self, play_context, new_stdin, original_transport, **kwargs): def __init__(self, play_context, new_stdin, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
'The "mitogen" connection plug-in may only be instantiated ' 'Mitogen connection types may only be instantiated '
'by the "mitogen" strategy plug-in.' 'while the "mitogen" strategy is active.'
) )
self.original_transport = original_transport
self.transport = original_transport
self.kwargs = kwargs
super(Connection, self).__init__(play_context, new_stdin) super(Connection, self).__init__(play_context, new_stdin)
def __del__(self): def __del__(self):
@ -114,19 +239,13 @@ class Connection(ansible.plugins.connection.ConnectionBase):
executing. We use the opportunity to grab relevant bits from the executing. We use the opportunity to grab relevant bits from the
task-specific data. task-specific data.
""" """
self.ansible_ssh_timeout = task_vars.get( self.ansible_ssh_timeout = task_vars.get('ansible_ssh_timeout')
'ansible_ssh_timeout', self.python_path = task_vars.get('ansible_python_interpreter',
None '/usr/bin/python')
) self.sudo_exe = task_vars.get('ansible_sudo_exe', C.DEFAULT_SUDO_EXE)
self.python_path = task_vars.get( self.mitogen_via = task_vars.get('mitogen_via')
'ansible_python_interpreter', self.inventory_hostname = task_vars['inventory_hostname']
'/usr/bin/python' self.host_vars = task_vars['hostvars']
)
self.sudo_path = task_vars.get(
'ansible_sudo_exe',
'sudo'
)
self.close(new_task=True) self.close(new_task=True)
@property @property
@ -138,109 +257,52 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def connected(self): def connected(self):
return self.context is not None return self.context is not None
def _on_connection_error(self, msg): def _config_from_via(self, via_spec):
raise ansible.errors.AnsibleConnectionFailure(msg) become_user, _, inventory_name = via_spec.rpartition('@')
via_vars = self.host_vars[inventory_name]
def _on_become_error(self, msg): if isinstance(via_vars, jinja2.runtime.Undefined):
# TODO: vanilla become failures yield this: raise ansible.errors.AnsibleConnectionFailure(
# { self.unknown_via_msg % (
# "changed": false, self.mitogen_via,
# "module_stderr": "sudo: sorry, you must have a tty to run sudo\n", config['inventory_name'],
# "module_stdout": "", )
# "msg": "MODULE FAILURE",
# "rc": 1
# }
#
# Currently we yield this:
# {
# "msg": "EOF on stream; last 300 bytes received: 'sudo: ....\n'"
# }
raise ansible.errors.AnsibleModuleError(msg)
def _wrap_connect(self, on_error, kwargs):
dct = mitogen.service.call(
context=self.parent,
handle=ansible_mitogen.services.ContextService.handle,
method='get',
kwargs=mitogen.utils.cast(kwargs),
) )
if dct['msg']: return config_from_hostvars(
on_error(dct['msg']) transport=via_vars.get('ansible_connection', 'ssh'),
inventory_name=inventory_name,
return dct['context'], dct['home_dir'] connection=self,
hostvars=via_vars,
become_user=become_user or None,
)
def _connect_local(self): unknown_via_msg = 'mitogen_via=%s of %s specifies an unknown hostname'
""" via_cycle_msg = 'mitogen_via=%s of %s creates a cycle (%s)'
Fetch a reference to the local() Context from ContextService in the
master process. def _stack_from_config(self, config, stack=(), seen_names=()):
""" if config['inventory_name'] in seen_names:
return self._wrap_connect(self._on_connection_error, { raise ansible.errors.AnsibleConnectionFailure(
'method_name': 'local', self.via_cycle_msg % (
'python_path': self.python_path, config['mitogen_via'],
}) config['inventory_name'],
' -> '.join(reversed(
seen_names + (config['inventory_name'],)
)),
)
)
def _connect_ssh(self): if config['mitogen_via']:
""" stack, seen_names = self._stack_from_config(
Fetch a reference to an SSH Context matching the play context from self._config_from_via(config['mitogen_via']),
ContextService in the master process. stack=stack,
""" seen_names=seen_names + (config['inventory_name'],)
return self._wrap_connect(self._on_connection_error, {
'method_name': 'ssh',
'check_host_keys': False, # TODO
'hostname': self._play_context.remote_addr,
'username': self._play_context.remote_user,
'password': self._play_context.password,
'port': self._play_context.port,
'python_path': self.python_path,
'identity_file': self._play_context.private_key_file,
'ssh_path': self._play_context.ssh_executable,
'connect_timeout': self.ansible_ssh_timeout,
'ssh_args': [
term
for s in (
getattr(self._play_context, 'ssh_args', ''),
getattr(self._play_context, 'ssh_common_args', ''),
getattr(self._play_context, 'ssh_extra_args', '')
) )
for term in shlex.split(s or '')
]
})
def _connect_docker(self): stack += (CONNECTION_METHOD[config['transport']](config),)
return self._wrap_connect(self._on_connection_error, { if config['become']:
'method_name': 'docker', stack += (CONNECTION_METHOD[config['become_method']](config),)
'container': self._play_context.remote_addr,
'python_path': self.python_path,
'connect_timeout': self._play_context.timeout,
})
def _connect_sudo(self, via=None, python_path=None): return stack, seen_names
"""
Fetch a reference to a sudo Context matching the play context from
ContextService in the master process.
:param via:
Parent Context of the sudo Context. For Ansible, this should always
be a Context returned by _connect_ssh().
"""
return self._wrap_connect(self._on_become_error, {
'method_name': 'sudo',
'username': self._play_context.become_user,
'password': self._play_context.become_pass,
'python_path': python_path or self.python_path,
'sudo_path': self.sudo_path,
'connect_timeout': self._play_context.timeout,
'via': via,
'sudo_args': [
term
for s in (
self._play_context.sudo_flags,
self._play_context.become_flags
)
for term in shlex.split(s or '')
],
})
def _connect(self): def _connect(self):
""" """
@ -263,24 +325,30 @@ class Connection(ansible.plugins.connection.ConnectionBase):
broker=self.broker, broker=self.broker,
) )
if self.original_transport == 'local': stack, _ = self._stack_from_config(
if self._play_context.become: config_from_play_context(
self.context, self._homedir = self._connect_sudo( transport=self.transport,
python_path=sys.executable inventory_name=self.inventory_hostname,
connection=self
)
)
dct = mitogen.service.call(
context=self.parent,
handle=ansible_mitogen.services.ContextService.handle,
method='get',
kwargs=mitogen.utils.cast({
'stack': stack,
})
) )
else:
self.context, self._homedir = self._connect_local()
return
if self.original_transport == 'docker': if dct['msg']:
self.host, self._homedir = self._connect_docker() if dct['method_name'] in self.become_methods:
elif self.original_transport == 'ssh': raise ansible.errors.AnsibleModuleError(dct['msg'])
self.host, self._homedir = self._connect_ssh() raise ansible.errors.AnsibleConnectionFailure(dct['msg'])
if self._play_context.become: self.context = dct['context']
self.context, self._homedir = self._connect_sudo(via=self.host) self._homedir = dct['home_dir']
else:
self.context = self.host
def get_context_name(self): def get_context_name(self):
""" """
@ -296,18 +364,16 @@ class Connection(ansible.plugins.connection.ConnectionBase):
gracefully shut down, and wait for shutdown to complete. Safe to call gracefully shut down, and wait for shutdown to complete. Safe to call
multiple times. multiple times.
""" """
for context in set([self.host, self.context]): if self.context:
if context:
mitogen.service.call( mitogen.service.call(
context=self.parent, context=self.parent,
handle=ansible_mitogen.services.ContextService.handle, handle=ansible_mitogen.services.ContextService.handle,
method='put', method='put',
kwargs={ kwargs={
'context': context 'context': self.context
} }
) )
self.host = None
self.context = None self.context = None
if self.broker and not new_task: if self.broker and not new_task:
self.broker.shutdown() self.broker.shutdown()
@ -420,3 +486,15 @@ class Connection(ansible.plugins.connection.ConnectionBase):
in_path=in_path, in_path=in_path,
out_path=out_path out_path=out_path
) )
class SshConnection(Connection):
transport = 'ssh'
class LocalConnection(Connection):
transport = 'local'
class DockerConnection(Connection):
transport = 'docker'

@ -0,0 +1,56 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os.path
import sys
#
# This is not the real Connection implementation module, it simply exists as a
# proxy to the real module, which is loaded using Python's regular import
# mechanism, to prevent Ansible's PluginLoader from making up a fake name that
# results in ansible_mitogen plugin modules being loaded twice: once by
# PluginLoader with a name like "ansible.plugins.connection.mitogen", which is
# stuffed into sys.modules even though attempting to import it will trigger an
# ImportError, and once under its canonical name, "ansible_mitogen.connection".
#
# Therefore we have a proxy module that imports it under the real name, and
# sets up the duff PluginLoader-imported module to just contain objects from
# the real module, so duplicate types don't exist in memory, and things like
# debuggers and isinstance() work predictably.
#
try:
import ansible_mitogen
except ImportError:
base_dir = os.path.dirname(__file__)
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir
from ansible_mitogen.connection import DockerConnection as Connection
del os
del sys

@ -0,0 +1,56 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os.path
import sys
#
# This is not the real Connection implementation module, it simply exists as a
# proxy to the real module, which is loaded using Python's regular import
# mechanism, to prevent Ansible's PluginLoader from making up a fake name that
# results in ansible_mitogen plugin modules being loaded twice: once by
# PluginLoader with a name like "ansible.plugins.connection.mitogen", which is
# stuffed into sys.modules even though attempting to import it will trigger an
# ImportError, and once under its canonical name, "ansible_mitogen.connection".
#
# Therefore we have a proxy module that imports it under the real name, and
# sets up the duff PluginLoader-imported module to just contain objects from
# the real module, so duplicate types don't exist in memory, and things like
# debuggers and isinstance() work predictably.
#
try:
import ansible_mitogen
except ImportError:
base_dir = os.path.dirname(__file__)
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir
from ansible_mitogen.connection import LocalConnection as Connection
del os
del sys

@ -51,6 +51,6 @@ except ImportError:
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir del base_dir
from ansible_mitogen.connection import Connection from ansible_mitogen.connection import SshConnection as Connection
del os del os
del sys del sys

@ -82,9 +82,9 @@ class ContextService(mitogen.service.Service):
#: Records the :meth:`get` result dict for successful calls, returned #: Records the :meth:`get` result dict for successful calls, returned
#: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`. #: for identical subsequent calls. Keyed by :meth:`key_from_kwargs`.
self._response_by_key = {} self._response_by_key = {}
#: List of :class:`mitogen.core.Message` waiting for the result dict #: List of :class:`mitogen.core.Latch` awaiting the result for a
#: for a particular connection config. Keyed as sbove. #: particular key.
self._waiters_by_key = {} self._latches_by_key = {}
#: Mapping of :class:`mitogen.core.Context` -> reference count. Each #: Mapping of :class:`mitogen.core.Context` -> reference count. Each
#: call to :meth:`get` increases this by one. Calls to :meth:`put` #: call to :meth:`get` increases this by one. Calls to :meth:`put`
#: decrease it by one. #: decrease it by one.
@ -134,10 +134,10 @@ class ContextService(mitogen.service.Service):
""" """
self._lock.acquire() self._lock.acquire()
try: try:
waiters = self._waiters_by_key.pop(key) latches = self._latches_by_key.pop(key)
count = len(waiters) count = len(latches)
for msg in waiters: for latch in latches:
msg.reply(response) latch.put(response)
finally: finally:
self._lock.release() self._lock.release()
return count return count
@ -164,17 +164,12 @@ class ContextService(mitogen.service.Service):
finally: finally:
self._lock.release() self._lock.release()
def _update_lru(self, new_context, **kwargs): def _update_lru(self, new_context, spec, via):
""" """
Update the LRU ("MRU"?) list associated with the connection described Update the LRU ("MRU"?) list associated with the connection described
by `kwargs`, destroying the most recently created context if the list by `kwargs`, destroying the most recently created context if the list
is full. Finally add `new_context` to the list. is full. Finally add `new_context` to the list.
""" """
via = kwargs.get('via')
if via is None:
# We don't have a limit on the number of directly connections.
return
lru = self._lru_by_via.setdefault(via, []) lru = self._lru_by_via.setdefault(via, [])
if len(lru) < self.max_interpreters: if len(lru) < self.max_interpreters:
lru.append(new_context) lru.append(new_context)
@ -207,7 +202,7 @@ class ContextService(mitogen.service.Service):
""" """
# TODO: there is a race between creation of a context and disconnection # TODO: there is a race between creation of a context and disconnection
# of its related stream. An error reply should be sent to any message # of its related stream. An error reply should be sent to any message
# in _waiters_by_key below. # in _latches_by_key below.
self._lock.acquire() self._lock.acquire()
try: try:
for context, key in list(self._key_by_context.items()): for context, key in list(self._key_by_context.items()):
@ -215,14 +210,14 @@ class ContextService(mitogen.service.Service):
LOG.info('Dropping %r due to disconnect of %r', LOG.info('Dropping %r due to disconnect of %r',
context, stream) context, stream)
self._response_by_key.pop(key, None) self._response_by_key.pop(key, None)
self._waiters_by_key.pop(key, None) self._latches_by_key.pop(key, None)
self._refs_by_context.pop(context, None) self._refs_by_context.pop(context, None)
self._lru_by_via.pop(context, None) self._lru_by_via.pop(context, None)
self._refs_by_context.pop(context, None) self._refs_by_context.pop(context, None)
finally: finally:
self._lock.release() self._lock.release()
def _connect(self, key, method_name, **kwargs): def _connect(self, key, spec, via=None):
""" """
Actual connect implementation. Arranges for the Mitogen connection to Actual connect implementation. Arranges for the Mitogen connection to
be created and enqueues an asynchronous call to start the forked task be created and enqueues an asynchronous call to start the forked task
@ -230,11 +225,8 @@ class ContextService(mitogen.service.Service):
:param key: :param key:
Deduplication key representing the connection configuration. Deduplication key representing the connection configuration.
:param method_name: :param spec:
:class:`mitogen.parent.Router` method implementing the connection Connection specification.
type.
:param kwargs:
Keyword arguments passed to the router method.
:returns: :returns:
Dict like:: Dict like::
@ -248,21 +240,14 @@ class ContextService(mitogen.service.Service):
:data:`None`, or `msg` is :data:`None` and the remaining fields are :data:`None`, or `msg` is :data:`None` and the remaining fields are
set. set.
""" """
method = getattr(self.router, method_name, None)
if method is None:
raise Error('no such Router method: %s' % (method_name,))
try: try:
context = method(**kwargs) method = getattr(self.router, spec['method'])
except mitogen.core.StreamError as e: except AttributeError:
return { raise Error('unsupported method: %(transport)s' % spec)
'context': None,
'home_dir': None,
'msg': str(e),
}
if kwargs.get('via'): context = method(via=via, **spec['kwargs'])
self._update_lru(context, method_name=method_name, **kwargs) if via:
self._update_lru(context, spec, via)
else: else:
# For directly connected contexts, listen to the associated # For directly connected contexts, listen to the associated
# Stream's disconnect event and use it to invalidate dependent # Stream's disconnect event and use it to invalidate dependent
@ -289,60 +274,74 @@ class ContextService(mitogen.service.Service):
'msg': None, 'msg': None,
} }
@mitogen.service.expose(mitogen.service.AllowParents()) def _wait_or_start(self, spec, via=None):
@mitogen.service.arg_spec({ latch = mitogen.core.Latch()
'method_name': str key = self.key_from_kwargs(via=via, **spec)
})
def get(self, msg, **kwargs):
"""
Return a Context referring to an established connection with the given
configuration, establishing a new connection as necessary.
:param str method_name:
The :class:`mitogen.parent.Router` connection method to use.
:param dict kwargs:
Keyword arguments passed to `mitogen.master.Router.[method_name]()`.
:returns tuple:
Tuple of `(context, home_dir)`, where:
* `context` is the mitogen.master.Context referring to the
target context.
* `home_dir` is a cached copy of the remote directory.
"""
key = self.key_from_kwargs(**kwargs)
self._lock.acquire() self._lock.acquire()
try: try:
response = self._response_by_key.get(key) response = self._response_by_key.get(key)
if response is not None: if response is not None:
self._refs_by_context[response['context']] += 1 self._refs_by_context[response['context']] += 1
return response latch.put(response)
return latch
waiters = self._waiters_by_key.get(key) latches = self._latches_by_key.setdefault(key, [])
if waiters is not None: first = len(latches) == 0
waiters.append(msg) latches.append(latch)
return self.NO_REPLY
self._waiters_by_key[key] = [msg]
finally: finally:
self._lock.release() self._lock.release()
# I'm the first thread to wait, so I will create the connection. if first:
# I'm the first requestee, so I will create the connection.
try: try:
response = self._connect(key, **kwargs) response = self._connect(key, spec, via=via)
count = self._produce_response(key, response) count = self._produce_response(key, response)
if response['msg'] is None:
# Only record the response for non-error results. # Only record the response for non-error results.
self._response_by_key[key] = response self._response_by_key[key] = response
# Set the reference count to the number of waiters. # Set the reference count to the number of waiters.
self._refs_by_context[response['context']] += count self._refs_by_context[response['context']] += count
except mitogen.core.CallError:
e = sys.exc_info()[1]
self._produce_response(key, e)
except Exception: except Exception:
e = sys.exc_info()[1] self._produce_response(key, sys.exc_info())
self._produce_response(key, mitogen.core.CallError(e))
return latch
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'stack': list
})
def get(self, msg, stack):
"""
Return a Context referring to an established connection with the given
configuration, establishing new connections as necessary.
:param list stack:
Connection descriptions. Each element is a dict containing 'method'
and 'kwargs' keys describing the Router method and arguments.
Subsequent elements are proxied via the previous.
:returns dict:
* context: mitogen.master.Context or None.
* homedir: Context's home directory or None.
* msg: StreamError exception text or None.
* method_name: string failing method name.
"""
via = None
for spec in stack:
try:
result = self._wait_or_start(spec, via=via).get()
if isinstance(result, tuple): # exc_info()
e1, e2, e3 = result
raise e1, e2, e3
via = result['context']
except mitogen.core.StreamError as e:
return {
'context': None,
'home_dir': None,
'method_name': spec['method'],
'msg': str(e),
}
return self.NO_REPLY return result
class FileService(mitogen.service.Service): class FileService(mitogen.service.Service):

@ -69,8 +69,7 @@ def wrap_connection_loader__get(name, play_context, new_stdin, **kwargs):
an argument, so that it can emulate the original type. an argument, so that it can emulate the original type.
""" """
if name in ('ssh', 'local', 'docker'): if name in ('ssh', 'local', 'docker'):
kwargs['original_transport'] = name name = 'mitogen_' + name
name = 'mitogen'
return connection_loader__get(name, play_context, new_stdin, **kwargs) return connection_loader__get(name, play_context, new_stdin, **kwargs)

Loading…
Cancel
Save