Merge remote-tracking branch 'origin/stable-029' into stable

pull/862/head v0.2.9
David Wilson 5 years ago
commit d3f6ad74c4

@ -264,7 +264,7 @@ def start_containers(containers):
"docker rm -f %(name)s || true" % container, "docker rm -f %(name)s || true" % container,
"docker run " "docker run "
"--rm " "--rm "
"--cpuset-cpus 0,1 " # "--cpuset-cpus 0,1 "
"--detach " "--detach "
"--privileged " "--privileged "
"--cap-add=SYS_PTRACE " "--cap-add=SYS_PTRACE "

@ -7,7 +7,7 @@ batches = [
'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),), 'docker pull %s' % (ci_lib.image_for_distro(ci_lib.DISTRO),),
], ],
[ [
'sudo tar -C / -jxvf tests/data/ubuntu-python-2.4.6.tar.bz2', 'curl https://dw.github.io/mitogen/binaries/ubuntu-python-2.4.6.tar.bz2 | sudo tar -C / -jxv',
] ]
] ]

@ -265,9 +265,19 @@ class LinuxPolicy(FixedPolicy):
mask >>= 64 mask >>= 64
return mitogen.core.b('').join(chunks) return mitogen.core.b('').join(chunks)
def _get_thread_ids(self):
try:
ents = os.listdir('/proc/self/task')
except OSError:
LOG.debug('cannot fetch thread IDs for current process')
return [os.getpid()]
return [int(s) for s in ents if s.isdigit()]
def _set_cpu_mask(self, mask): def _set_cpu_mask(self, mask):
s = self._mask_to_bytes(mask) s = self._mask_to_bytes(mask)
_sched_setaffinity(os.getpid(), len(s), s) for tid in self._get_thread_ids():
_sched_setaffinity(tid, len(s), s)
if _sched_setaffinity is not None: if _sched_setaffinity is not None:

@ -56,6 +56,12 @@ import ansible_mitogen.transport_config
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
task_vars_msg = (
'could not recover task_vars. This means some connection '
'settings may erroneously be reset to their defaults. '
'Please report a bug if you encounter this message.'
)
def get_remote_name(spec): def get_remote_name(spec):
""" """
@ -486,15 +492,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
# the case of the synchronize module. # the case of the synchronize module.
# #
#: Set to the host name as it appears in inventory by on_action_run().
inventory_hostname = None
#: Set to task_vars by on_action_run(). #: Set to task_vars by on_action_run().
_task_vars = None _task_vars = None
#: Set to 'hostvars' by on_action_run()
host_vars = None
#: Set by on_action_run() #: Set by on_action_run()
delegate_to_hostname = None delegate_to_hostname = None
@ -527,12 +527,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:param str loader_basedir: :param str loader_basedir:
Loader base directory; see :attr:`loader_basedir`. Loader base directory; see :attr:`loader_basedir`.
""" """
self.inventory_hostname = task_vars['inventory_hostname']
self._task_vars = task_vars self._task_vars = task_vars
self.host_vars = task_vars['hostvars']
self.delegate_to_hostname = delegate_to_hostname self.delegate_to_hostname = delegate_to_hostname
self.loader_basedir = loader_basedir self.loader_basedir = loader_basedir
self._mitogen_reset(mode='put') self._put_connection()
def _get_task_vars(self): def _get_task_vars(self):
""" """
@ -552,8 +550,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
for new connections to be constructed in addition to the preconstructed for new connections to be constructed in addition to the preconstructed
connection passed into any running action. connection passed into any running action.
""" """
f = sys._getframe() if self._task_vars is not None:
return self._task_vars
f = sys._getframe()
while f: while f:
if f.f_code.co_name == 'run': if f.f_code.co_name == 'run':
f_locals = f.f_locals f_locals = f.f_locals
@ -571,9 +571,23 @@ class Connection(ansible.plugins.connection.ConnectionBase):
f = f.f_back f = f.f_back
LOG.warning('could not recover task_vars. This means some connection ' raise ansible.errors.AnsibleConnectionFailure(task_vars_msg)
'settings may erroneously be reset to their defaults. '
'Please report a bug if you encounter this message.') def get_host_vars(self, inventory_hostname):
"""
Fetch the HostVars for a host.
:returns:
Variables dictionary or :data:`None`.
:raises ansible.errors.AnsibleConnectionFailure:
Task vars unavailable.
"""
task_vars = self._get_task_vars()
hostvars = task_vars.get('hostvars')
if hostvars:
return hostvars.get(inventory_hostname)
raise ansible.errors.AnsibleConnectionFailure(task_vars_msg)
def get_task_var(self, key, default=None): def get_task_var(self, key, default=None):
""" """
@ -586,8 +600,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
does not make sense to extract connection-related configuration for the does not make sense to extract connection-related configuration for the
delegated-to machine from them. delegated-to machine from them.
""" """
task_vars = self._task_vars or self._get_task_vars() task_vars = self._get_task_vars()
if task_vars is not None:
if self.delegate_to_hostname is None: if self.delegate_to_hostname is None:
if key in task_vars: if key in task_vars:
return task_vars[key] return task_vars[key]
@ -628,7 +641,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
# must use __contains__ to avoid a TypeError for a missing host on # must use __contains__ to avoid a TypeError for a missing host on
# Ansible 2.3. # Ansible 2.3.
if self.host_vars is None or inventory_name not in self.host_vars: via_vars = self.get_host_vars(inventory_name)
if via_vars is None:
raise ansible.errors.AnsibleConnectionFailure( raise ansible.errors.AnsibleConnectionFailure(
self.unknown_via_msg % ( self.unknown_via_msg % (
via_spec, via_spec,
@ -636,7 +650,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
) )
) )
via_vars = self.host_vars[inventory_name]
return ansible_mitogen.transport_config.MitogenViaSpec( return ansible_mitogen.transport_config.MitogenViaSpec(
inventory_name=inventory_name, inventory_name=inventory_name,
play_context=self._play_context, play_context=self._play_context,
@ -712,7 +725,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
connection=self, connection=self,
play_context=self._play_context, play_context=self._play_context,
transport=self.transport, transport=self.transport,
inventory_name=self.inventory_hostname, inventory_name=self.get_task_var('inventory_hostname'),
) )
stack = self._stack_from_spec(spec) stack = self._stack_from_spec(spec)
return spec.inventory_name(), stack return spec.inventory_name(), stack
@ -778,18 +791,16 @@ class Connection(ansible.plugins.connection.ConnectionBase):
inventory_name, stack = self._build_stack() inventory_name, stack = self._build_stack()
worker_model = ansible_mitogen.process.get_worker_model() worker_model = ansible_mitogen.process.get_worker_model()
self.binding = worker_model.get_binding(inventory_name) self.binding = worker_model.get_binding(
mitogen.utils.cast(inventory_name)
)
self._connect_stack(stack) self._connect_stack(stack)
def _mitogen_reset(self, mode): def _put_connection(self):
""" """
Forget everything we know about the connected context. This function Forget everything we know about the connected context. This function
cannot be called _reset() since that name is used as a public API by cannot be called _reset() since that name is used as a public API by
Ansible 2.4 wait_for_connection plug-in. Ansible 2.4 wait_for_connection plug-in.
:param str mode:
Name of ContextService method to use to discard the context, either
'put' or 'reset'.
""" """
if not self.context: if not self.context:
return return
@ -798,7 +809,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
mitogen.service.call( mitogen.service.call(
call_context=self.binding.get_service_context(), call_context=self.binding.get_service_context(),
service_name='ansible_mitogen.services.ContextService', service_name='ansible_mitogen.services.ContextService',
method_name=mode, method_name='put',
context=self.context context=self.context
) )
@ -813,24 +824,11 @@ class Connection(ansible.plugins.connection.ConnectionBase):
gracefully shut down, and wait for shutdown to complete. Safe to call gracefully shut down, and wait for shutdown to complete. Safe to call
multiple times. multiple times.
""" """
self._mitogen_reset(mode='put') self._put_connection()
if self.binding: if self.binding:
self.binding.close() self.binding.close()
self.binding = None self.binding = None
def _reset_find_task_vars(self):
"""
Monsterous hack: since "meta: reset_connection" does not run from an
action, we cannot capture task variables via :meth:`on_action_run`.
Instead walk the parent frames searching for the `all_vars` local from
StrategyBase._execute_meta(). If this fails, just leave task_vars
unset, likely causing a subtly wrong configuration to be selected.
"""
frame = sys._getframe()
while frame and not self._task_vars:
self._task_vars = frame.f_locals.get('all_vars')
frame = frame.f_back
reset_compat_msg = ( reset_compat_msg = (
'Mitogen only supports "reset_connection" on Ansible 2.5.6 or later' 'Mitogen only supports "reset_connection" on Ansible 2.5.6 or later'
) )
@ -842,9 +840,6 @@ class Connection(ansible.plugins.connection.ConnectionBase):
the 'disconnected' state, and informs ContextService the connection is the 'disconnected' state, and informs ContextService the connection is
bad somehow, and should be shut down and discarded. bad somehow, and should be shut down and discarded.
""" """
if self._task_vars is None:
self._reset_find_task_vars()
if self._play_context.remote_addr is None: if self._play_context.remote_addr is None:
# <2.5.6 incorrectly populate PlayContext for reset_connection # <2.5.6 incorrectly populate PlayContext for reset_connection
# https://github.com/ansible/ansible/issues/27520 # https://github.com/ansible/ansible/issues/27520
@ -852,10 +847,24 @@ class Connection(ansible.plugins.connection.ConnectionBase):
self.reset_compat_msg self.reset_compat_msg
) )
self._connect() # Clear out state in case we were ever connected.
self._mitogen_reset(mode='reset') self.close()
self.binding.close()
self.binding = None inventory_name, stack = self._build_stack()
if self._play_context.become:
stack = stack[:-1]
worker_model = ansible_mitogen.process.get_worker_model()
binding = worker_model.get_binding(inventory_name)
try:
mitogen.service.call(
call_context=binding.get_service_context(),
service_name='ansible_mitogen.services.ContextService',
method_name='reset',
stack=mitogen.utils.cast(list(stack)),
)
finally:
binding.close()
# Compatibility with Ansible 2.4 wait_for_connection plug-in. # Compatibility with Ansible 2.4 wait_for_connection plug-in.
_reset = reset _reset = reset

@ -55,3 +55,8 @@ except ImportError: # Ansible <2.4
from ansible.plugins import module_utils_loader from ansible.plugins import module_utils_loader
from ansible.plugins import shell_loader from ansible.plugins import shell_loader
from ansible.plugins import strategy_loader from ansible.plugins import strategy_loader
# These are original, unwrapped implementations
action_loader__get = action_loader.get
connection_loader__get = connection_loader.get

@ -107,8 +107,9 @@ def setup():
l_mitogen = logging.getLogger('mitogen') l_mitogen = logging.getLogger('mitogen')
l_mitogen_io = logging.getLogger('mitogen.io') l_mitogen_io = logging.getLogger('mitogen.io')
l_ansible_mitogen = logging.getLogger('ansible_mitogen') l_ansible_mitogen = logging.getLogger('ansible_mitogen')
l_operon = logging.getLogger('operon')
for logger in l_mitogen, l_mitogen_io, l_ansible_mitogen: for logger in l_mitogen, l_mitogen_io, l_ansible_mitogen, l_operon:
logger.handlers = [Handler(display.vvv)] logger.handlers = [Handler(display.vvv)]
logger.propagate = False logger.propagate = False

@ -55,6 +55,11 @@ import ansible_mitogen.planner
import ansible_mitogen.target import ansible_mitogen.target
from ansible.module_utils._text import to_text from ansible.module_utils._text import to_text
try:
from ansible.utils.unsafe_proxy import wrap_var
except ImportError:
from ansible.vars.unsafe_proxy import wrap_var
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -306,7 +311,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
except AttributeError: except AttributeError:
return getattr(self._task, 'async') return getattr(self._task, 'async')
def _temp_file_gibberish(self, module_args, wrap_async): def _set_temp_file_args(self, module_args, wrap_async):
# Ansible>2.5 module_utils reuses the action's temporary directory if # Ansible>2.5 module_utils reuses the action's temporary directory if
# one exists. Older versions error if this key is present. # one exists. Older versions error if this key is present.
if ansible.__version__ > '2.5': if ansible.__version__ > '2.5':
@ -343,7 +348,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
self._update_module_args(module_name, module_args, task_vars) self._update_module_args(module_name, module_args, task_vars)
env = {} env = {}
self._compute_environment_string(env) self._compute_environment_string(env)
self._temp_file_gibberish(module_args, wrap_async) self._set_temp_file_args(module_args, wrap_async)
self._connection._connect() self._connection._connect()
result = ansible_mitogen.planner.invoke( result = ansible_mitogen.planner.invoke(
@ -365,7 +370,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
# on _execute_module(). # on _execute_module().
self._remove_tmp_path(tmp) self._remove_tmp_path(tmp)
return result return wrap_var(result)
def _postprocess_response(self, result): def _postprocess_response(self, result):
""" """

@ -45,6 +45,7 @@ import random
from ansible.executor import module_common from ansible.executor import module_common
import ansible.errors import ansible.errors
import ansible.module_utils import ansible.module_utils
import ansible.release
import mitogen.core import mitogen.core
import mitogen.select import mitogen.select
@ -58,6 +59,8 @@ NO_METHOD_MSG = 'Mitogen: no invocation method found for: '
NO_INTERPRETER_MSG = 'module (%s) is missing interpreter line' NO_INTERPRETER_MSG = 'module (%s) is missing interpreter line'
NO_MODULE_MSG = 'The module %s was not found in configured module paths.' NO_MODULE_MSG = 'The module %s was not found in configured module paths.'
_planner_by_path = {}
class Invocation(object): class Invocation(object):
""" """
@ -92,7 +95,12 @@ class Invocation(object):
self.module_path = None self.module_path = None
#: Initially ``None``, but set by :func:`invoke`. The raw source or #: Initially ``None``, but set by :func:`invoke`. The raw source or
#: binary contents of the module. #: binary contents of the module.
self.module_source = None self._module_source = None
def get_module_source(self):
if self._module_source is None:
self._module_source = read_file(self.module_path)
return self._module_source
def __repr__(self): def __repr__(self):
return 'Invocation(module_name=%s)' % (self.module_name,) return 'Invocation(module_name=%s)' % (self.module_name,)
@ -107,7 +115,8 @@ class Planner(object):
def __init__(self, invocation): def __init__(self, invocation):
self._inv = invocation self._inv = invocation
def detect(self): @classmethod
def detect(cls, path, source):
""" """
Return true if the supplied `invocation` matches the module type Return true if the supplied `invocation` matches the module type
implemented by this planner. implemented by this planner.
@ -171,8 +180,9 @@ class BinaryPlanner(Planner):
""" """
runner_name = 'BinaryRunner' runner_name = 'BinaryRunner'
def detect(self): @classmethod
return module_common._is_binary(self._inv.module_source) def detect(cls, path, source):
return module_common._is_binary(source)
def get_push_files(self): def get_push_files(self):
return [mitogen.core.to_text(self._inv.module_path)] return [mitogen.core.to_text(self._inv.module_path)]
@ -218,7 +228,7 @@ class ScriptPlanner(BinaryPlanner):
def _get_interpreter(self): def _get_interpreter(self):
path, arg = ansible_mitogen.parsing.parse_hashbang( path, arg = ansible_mitogen.parsing.parse_hashbang(
self._inv.module_source self._inv.get_module_source()
) )
if path is None: if path is None:
raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % ( raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % (
@ -247,8 +257,9 @@ class JsonArgsPlanner(ScriptPlanner):
""" """
runner_name = 'JsonArgsRunner' runner_name = 'JsonArgsRunner'
def detect(self): @classmethod
return module_common.REPLACER_JSONARGS in self._inv.module_source def detect(cls, path, source):
return module_common.REPLACER_JSONARGS in source
class WantJsonPlanner(ScriptPlanner): class WantJsonPlanner(ScriptPlanner):
@ -265,8 +276,9 @@ class WantJsonPlanner(ScriptPlanner):
""" """
runner_name = 'WantJsonRunner' runner_name = 'WantJsonRunner'
def detect(self): @classmethod
return b'WANT_JSON' in self._inv.module_source def detect(cls, path, source):
return b'WANT_JSON' in source
class NewStylePlanner(ScriptPlanner): class NewStylePlanner(ScriptPlanner):
@ -278,8 +290,9 @@ class NewStylePlanner(ScriptPlanner):
runner_name = 'NewStyleRunner' runner_name = 'NewStyleRunner'
marker = b'from ansible.module_utils.' marker = b'from ansible.module_utils.'
def detect(self): @classmethod
return self.marker in self._inv.module_source def detect(cls, path, source):
return cls.marker in source
def _get_interpreter(self): def _get_interpreter(self):
return None, None return None, None
@ -323,7 +336,6 @@ class NewStylePlanner(ScriptPlanner):
for path in ansible_mitogen.loaders.module_utils_loader._get_paths( for path in ansible_mitogen.loaders.module_utils_loader._get_paths(
subdirs=False subdirs=False
) )
if os.path.isdir(path)
) )
_module_map = None _module_map = None
@ -347,6 +359,10 @@ class NewStylePlanner(ScriptPlanner):
def get_kwargs(self): def get_kwargs(self):
return super(NewStylePlanner, self).get_kwargs( return super(NewStylePlanner, self).get_kwargs(
module_map=self.get_module_map(), module_map=self.get_module_map(),
py_module_name=py_modname_from_path(
self._inv.module_name,
self._inv.module_path,
),
) )
@ -376,14 +392,16 @@ class ReplacerPlanner(NewStylePlanner):
""" """
runner_name = 'ReplacerRunner' runner_name = 'ReplacerRunner'
def detect(self): @classmethod
return module_common.REPLACER in self._inv.module_source def detect(cls, path, source):
return module_common.REPLACER in source
class OldStylePlanner(ScriptPlanner): class OldStylePlanner(ScriptPlanner):
runner_name = 'OldStyleRunner' runner_name = 'OldStyleRunner'
def detect(self): @classmethod
def detect(cls, path, source):
# Everything else. # Everything else.
return True return True
@ -398,14 +416,54 @@ _planners = [
] ]
def get_module_data(name): try:
path = ansible_mitogen.loaders.module_loader.find_plugin(name, '') _get_ansible_module_fqn = module_common._get_ansible_module_fqn
if path is None: except AttributeError:
raise ansible.errors.AnsibleError(NO_MODULE_MSG % (name,)) _get_ansible_module_fqn = None
def py_modname_from_path(name, path):
"""
Fetch the logical name of a new-style module as it might appear in
:data:`sys.modules` of the target's Python interpreter.
* For Ansible <2.7, this is an unpackaged module named like
"ansible_module_%s".
* For Ansible <2.9, this is an unpackaged module named like
"ansible.modules.%s"
* Since Ansible 2.9, modules appearing within a package have the original
package hierarchy approximated on the target, enabling relative imports
to function correctly. For example, "ansible.modules.system.setup".
"""
# 2.9+
if _get_ansible_module_fqn:
try:
return _get_ansible_module_fqn(path)
except ValueError:
pass
with open(path, 'rb') as fp: if ansible.__version__ < '2.7':
source = fp.read() return 'ansible_module_' + name
return mitogen.core.to_text(path), source
return 'ansible.modules.' + name
def read_file(path):
fd = os.open(path, os.O_RDONLY)
try:
bits = []
chunk = True
while True:
chunk = os.read(fd, 65536)
if not chunk:
break
bits.append(chunk)
finally:
os.close(fd)
return mitogen.core.b('').join(bits)
def _propagate_deps(invocation, planner, context): def _propagate_deps(invocation, planner, context):
@ -466,14 +524,12 @@ def _invoke_isolated_task(invocation, planner):
context.shutdown() context.shutdown()
def _get_planner(invocation): def _get_planner(name, path, source):
for klass in _planners: for klass in _planners:
planner = klass(invocation) if klass.detect(path, source):
if planner.detect(): LOG.debug('%r accepted %r (filename %r)', klass, name, path)
LOG.debug('%r accepted %r (filename %r)', planner, return klass
invocation.module_name, invocation.module_path) LOG.debug('%r rejected %r', klass, name)
return planner
LOG.debug('%r rejected %r', planner, invocation.module_name)
raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))
@ -488,10 +544,24 @@ def invoke(invocation):
:raises ansible.errors.AnsibleError: :raises ansible.errors.AnsibleError:
Unrecognized/unsupported module type. Unrecognized/unsupported module type.
""" """
(invocation.module_path, path = ansible_mitogen.loaders.module_loader.find_plugin(
invocation.module_source) = get_module_data(invocation.module_name) invocation.module_name,
planner = _get_planner(invocation) '',
)
if path is None:
raise ansible.errors.AnsibleError(NO_MODULE_MSG % (
invocation.module_name,
))
invocation.module_path = mitogen.core.to_text(path)
if invocation.module_path not in _planner_by_path:
_planner_by_path[invocation.module_path] = _get_planner(
invocation.module_name,
invocation.module_path,
invocation.get_module_source()
)
planner = _planner_by_path[invocation.module_path](invocation)
if invocation.wrap_async: if invocation.wrap_async:
response = _invoke_async_task(invocation, planner) response = _invoke_async_task(invocation, planner)
elif planner.should_fork(): elif planner.should_fork():

@ -31,11 +31,6 @@ from __future__ import absolute_import
import os.path import os.path
import sys import sys
try:
from ansible.plugins.connection import kubectl
except ImportError:
kubectl = None
from ansible.errors import AnsibleConnectionFailure from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils.six import iteritems from ansible.module_utils.six import iteritems
@ -47,6 +42,19 @@ except ImportError:
del base_dir del base_dir
import ansible_mitogen.connection import ansible_mitogen.connection
import ansible_mitogen.loaders
_class = ansible_mitogen.loaders.connection_loader__get(
'kubectl',
class_only=True,
)
if _class:
kubectl = sys.modules[_class.__module__]
del _class
else:
kubectl = None
class Connection(ansible_mitogen.connection.Connection): class Connection(ansible_mitogen.connection.Connection):

@ -42,21 +42,23 @@ DOCUMENTATION = """
options: options:
""" """
import ansible.plugins.connection.ssh
try: try:
import ansible_mitogen.connection import ansible_mitogen
except ImportError: except ImportError:
base_dir = os.path.dirname(__file__) base_dir = os.path.dirname(__file__)
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..'))) sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir del base_dir
import ansible_mitogen.connection import ansible_mitogen.connection
import ansible_mitogen.loaders
class Connection(ansible_mitogen.connection.Connection): class Connection(ansible_mitogen.connection.Connection):
transport = 'ssh' transport = 'ssh'
vanilla_class = ansible.plugins.connection.ssh.Connection vanilla_class = ansible_mitogen.loaders.connection_loader__get(
'ssh',
class_only=True,
)
@staticmethod @staticmethod
def _create_control_path(*args, **kwargs): def _create_control_path(*args, **kwargs):

@ -803,9 +803,10 @@ class NewStyleRunner(ScriptRunner):
#: path => new-style module bytecode. #: path => new-style module bytecode.
_code_by_path = {} _code_by_path = {}
def __init__(self, module_map, **kwargs): def __init__(self, module_map, py_module_name, **kwargs):
super(NewStyleRunner, self).__init__(**kwargs) super(NewStyleRunner, self).__init__(**kwargs)
self.module_map = module_map self.module_map = module_map
self.py_module_name = py_module_name
def _setup_imports(self): def _setup_imports(self):
""" """
@ -942,9 +943,22 @@ class NewStyleRunner(ScriptRunner):
self._handle_magic_exception(mod, sys.exc_info()[1]) self._handle_magic_exception(mod, sys.exc_info()[1])
raise raise
def _get_module_package(self):
"""
Since Ansible 2.9 __package__ must be set in accordance with an
approximation of the original package hierarchy, so that relative
imports function correctly.
"""
pkg, sep, modname = str_rpartition(self.py_module_name, '.')
if not sep:
return None
if mitogen.core.PY3:
return pkg
return pkg.encode()
def _run(self): def _run(self):
mod = types.ModuleType(self.main_module_name) mod = types.ModuleType(self.main_module_name)
mod.__package__ = None mod.__package__ = self._get_module_package()
# Some Ansible modules use __file__ to find the Ansiballz temporary # Some Ansible modules use __file__ to find the Ansiballz temporary
# directory. We must provide some temporary path in __file__, but we # directory. We must provide some temporary path in __file__, but we
# don't want to pointlessly write the module to disk when it never # don't want to pointlessly write the module to disk when it never

@ -156,20 +156,41 @@ class ContextService(mitogen.service.Service):
@mitogen.service.expose(mitogen.service.AllowParents()) @mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
'context': mitogen.core.Context 'stack': list,
}) })
def reset(self, context): def reset(self, stack):
""" """
Return a reference, forcing close and discard of the underlying Return a reference, forcing close and discard of the underlying
connection. Used for 'meta: reset_connection' or when some other error connection. Used for 'meta: reset_connection' or when some other error
is detected. is detected.
:returns:
:data:`True` if a connection was found to discard, otherwise
:data:`False`.
""" """
LOG.debug('%r.reset(%r)', self, context) LOG.debug('%r.reset(%r)', self, stack)
self._lock.acquire()
try: l = mitogen.core.Latch()
context = None
with self._lock:
for i, spec in enumerate(stack):
key = key_from_dict(via=context, **spec)
response = self._response_by_key.get(key)
if response is None:
LOG.debug('%r: could not find connection to shut down; '
'failed at hop %d', self, i)
return False
context = response['context']
mitogen.core.listen(context, 'disconnect', l.put)
self._shutdown_unlocked(context) self._shutdown_unlocked(context)
finally:
self._lock.release() # The timeout below is to turn a hang into a crash in case there is any
# possible race between 'disconnect' signal subscription, and the child
# abruptly disconnecting.
l.get(timeout=30.0)
return True
@mitogen.service.expose(mitogen.service.AllowParents()) @mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({ @mitogen.service.arg_spec({
@ -326,6 +347,7 @@ class ContextService(mitogen.service.Service):
) )
def _send_module_forwards(self, context): def _send_module_forwards(self, context):
if hasattr(self.router.responder, 'forward_modules'):
self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD) self.router.responder.forward_modules(context, self.ALWAYS_PRELOAD)
_candidate_temp_dirs = None _candidate_temp_dirs = None

@ -27,6 +27,7 @@
# POSSIBILITY OF SUCH DAMAGE. # POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import from __future__ import absolute_import
import distutils.version
import os import os
import signal import signal
import threading import threading
@ -52,8 +53,8 @@ except ImportError:
Sentinel = None Sentinel = None
ANSIBLE_VERSION_MIN = '2.3' ANSIBLE_VERSION_MIN = (2, 3)
ANSIBLE_VERSION_MAX = '2.8' ANSIBLE_VERSION_MAX = (2, 9)
NEW_VERSION_MSG = ( NEW_VERSION_MSG = (
"Your Ansible version (%s) is too recent. The most recent version\n" "Your Ansible version (%s) is too recent. The most recent version\n"
"supported by Mitogen for Ansible is %s.x. Please check the Mitogen\n" "supported by Mitogen for Ansible is %s.x. Please check the Mitogen\n"
@ -76,13 +77,15 @@ def _assert_supported_release():
an unsupported Ansible release. an unsupported Ansible release.
""" """
v = ansible.__version__ v = ansible.__version__
if not isinstance(v, tuple):
v = tuple(distutils.version.LooseVersion(v).version)
if v[:len(ANSIBLE_VERSION_MIN)] < ANSIBLE_VERSION_MIN: if v[:2] < ANSIBLE_VERSION_MIN:
raise ansible.errors.AnsibleError( raise ansible.errors.AnsibleError(
OLD_VERSION_MSG % (v, ANSIBLE_VERSION_MIN) OLD_VERSION_MSG % (v, ANSIBLE_VERSION_MIN)
) )
if v[:len(ANSIBLE_VERSION_MAX)] > ANSIBLE_VERSION_MAX: if v[:2] > ANSIBLE_VERSION_MAX:
raise ansible.errors.AnsibleError( raise ansible.errors.AnsibleError(
NEW_VERSION_MSG % (ansible.__version__, ANSIBLE_VERSION_MAX) NEW_VERSION_MSG % (ansible.__version__, ANSIBLE_VERSION_MAX)
) )
@ -132,7 +135,7 @@ def wrap_action_loader__get(name, *args, **kwargs):
if ansible.__version__ >= '2.8': if ansible.__version__ >= '2.8':
get_kwargs['collection_list'] = kwargs.pop('collection_list', None) get_kwargs['collection_list'] = kwargs.pop('collection_list', None)
klass = action_loader__get(name, **get_kwargs) klass = ansible_mitogen.loaders.action_loader__get(name, **get_kwargs)
if klass: if klass:
bases = (ansible_mitogen.mixins.ActionModuleMixin, klass) bases = (ansible_mitogen.mixins.ActionModuleMixin, klass)
adorned_klass = type(str(name), bases, {}) adorned_klass = type(str(name), bases, {})
@ -141,15 +144,29 @@ def wrap_action_loader__get(name, *args, **kwargs):
return adorned_klass(*args, **kwargs) return adorned_klass(*args, **kwargs)
REDIRECTED_CONNECTION_PLUGINS = (
'buildah',
'docker',
'kubectl',
'jail',
'local',
'lxc',
'lxd',
'machinectl',
'setns',
'ssh',
)
def wrap_connection_loader__get(name, *args, **kwargs): def wrap_connection_loader__get(name, *args, **kwargs):
""" """
While a Mitogen strategy is active, rewrite connection_loader.get() calls While a Mitogen strategy is active, rewrite connection_loader.get() calls
for some transports into requests for a compatible Mitogen transport. for some transports into requests for a compatible Mitogen transport.
""" """
if name in ('buildah', 'docker', 'kubectl', 'jail', 'local', if name in REDIRECTED_CONNECTION_PLUGINS:
'lxc', 'lxd', 'machinectl', 'setns', 'ssh'):
name = 'mitogen_' + name name = 'mitogen_' + name
return connection_loader__get(name, *args, **kwargs)
return ansible_mitogen.loaders.connection_loader__get(name, *args, **kwargs)
def wrap_worker__run(self): def wrap_worker__run(self):
@ -199,12 +216,7 @@ class AnsibleWrappers(object):
Install our PluginLoader monkey patches and update global variables Install our PluginLoader monkey patches and update global variables
with references to the real functions. with references to the real functions.
""" """
global action_loader__get
action_loader__get = ansible_mitogen.loaders.action_loader.get
ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get ansible_mitogen.loaders.action_loader.get = wrap_action_loader__get
global connection_loader__get
connection_loader__get = ansible_mitogen.loaders.connection_loader.get
ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get ansible_mitogen.loaders.connection_loader.get = wrap_connection_loader__get
global worker__run global worker__run
@ -215,8 +227,12 @@ class AnsibleWrappers(object):
""" """
Uninstall the PluginLoader monkey patches. Uninstall the PluginLoader monkey patches.
""" """
ansible_mitogen.loaders.action_loader.get = action_loader__get ansible_mitogen.loaders.action_loader.get = (
ansible_mitogen.loaders.connection_loader.get = connection_loader__get ansible_mitogen.loaders.action_loader__get
)
ansible_mitogen.loaders.connection_loader.get = (
ansible_mitogen.loaders.connection_loader__get
)
ansible.executor.process.worker.WorkerProcess.run = worker__run ansible.executor.process.worker.WorkerProcess.run = worker__run
def install(self): def install(self):

@ -7,6 +7,7 @@
{# Alabaster ships a completely useless custom.css, suppress it. #} {# Alabaster ships a completely useless custom.css, suppress it. #}
{%- block extrahead %} {%- block extrahead %}
<meta name="referrer" content="strict-origin">
<meta name="google-site-verification" content="oq5hNxRYo25tcfjfs3l6pPxfNgY3JzDYSpskc9q4TYI" /> <meta name="google-site-verification" content="oq5hNxRYo25tcfjfs3l6pPxfNgY3JzDYSpskc9q4TYI" />
<meta name="viewport" content="width=device-width, initial-scale=0.9, maximum-scale=0.9" /> <meta name="viewport" content="width=device-width, initial-scale=0.9, maximum-scale=0.9" />
{% endblock %} {% endblock %}

@ -85,10 +85,15 @@ Installation
Get notified of new releases and important fixes. Get notified of new releases and important fixes.
<p> <p>
<input type="email" placeholder="E-mail Address" name="email" style="font-size: 105%;"> <input type="email" placeholder="E-mail Address" name="email" style="font-size: 105%;"><br>
<input name="captcha_1" placeholder="Captcha" style="width: 10ex;">
<img class="captcha-image">
<a class="captcha-refresh" href="#">&#x21bb</a>
<button type="submit" style="font-size: 105%;"> <button type="submit" style="font-size: 105%;">
Subscribe Subscribe
</button> </button>
</p> </p>
<div id="emailthanks" style="display:none"> <div id="emailthanks" style="display:none">
@ -1380,6 +1385,7 @@ bandwidth and 1.8x less time**.
page_id: "operon", page_id: "operon",
urls: { urls: {
save_email: "https://networkgenomics.com/save-email/", save_email: "https://networkgenomics.com/save-email/",
save_email_captcha: "https://networkgenomics.com/save-email/captcha/",
} }
} }
}; };

@ -15,8 +15,8 @@ Release Notes
</style> </style>
v0.2.9 (unreleased) v0.2.10 (unreleased)
------------------- --------------------
To avail of fixes in an unreleased version, please download a ZIP file To avail of fixes in an unreleased version, please download a ZIP file
`directly from GitHub <https://github.com/dw/mitogen/>`_. `directly from GitHub <https://github.com/dw/mitogen/>`_.
@ -24,6 +24,23 @@ To avail of fixes in an unreleased version, please download a ZIP file
*(no changes)* *(no changes)*
v0.2.9 (2019-11-02)
-------------------
This release contains minimal fixes beyond those required for Ansible 2.9.
* :gh:issue:`633`: :ans:mod:`meta: reset_connection <meta>` could fail to reset
a connection when ``become: true`` was set on the playbook.
Thanks!
~~~~~~~
Mitogen would not be possible without the support of users. A huge thanks for
bug reports, testing, features and fixes in this release contributed by
`Can Ozokur <https://github.com/canozokur/>`_.
v0.2.8 (2019-08-18) v0.2.8 (2019-08-18)
------------------- -------------------
@ -35,13 +52,12 @@ Enhancements
~~~~~~~~~~~~ ~~~~~~~~~~~~
* :gh:issue:`556`, * :gh:issue:`556`,
:gh:issue:`587`: Ansible 2.8 is supported. `Become plugins :gh:issue:`587`: Ansible 2.8 is supported.
<https://docs.ansible.com/ansible/latest/plugins/become.html>`_ and `Become plugins <https://docs.ansible.com/ansible/latest/plugins/become.html>`_ (:gh:issue:`631`) and
`interpreter discovery `interpreter discovery <https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_ (:gh:issue:`630`)
<https://docs.ansible.com/ansible/latest/reference_appendices/interpreter_discovery.html>`_
are not yet handled. are not yet handled.
* :gh:issue:`419`, :gh:issue:`470`, file descriptor usage is approximately * :gh:issue:`419`, :gh:issue:`470`: file descriptor usage is approximately
halved, as it is no longer necessary to separately manage read and write halved, as it is no longer necessary to separately manage read and write
sides to work around a design problem. sides to work around a design problem.
@ -58,9 +74,10 @@ Enhancements
is exposed to Ansible as the :ans:conn:`buildah`. is exposed to Ansible as the :ans:conn:`buildah`.
* :gh:issue:`615`: a modified :ans:mod:`fetch` implements streaming transfer * :gh:issue:`615`: a modified :ans:mod:`fetch` implements streaming transfer
even when ``become`` is active, avoiding excess CPU usage and memory spikes, even when ``become`` is active, avoiding excess CPU and memory spikes, and
and improving performance. A copy of two 512 MiB files drops from 47 seconds improving performance. A representative copy of two 512 MiB files drops from
to 7 seconds, with peak memory usage dropping from 10.7 GiB to 64.8 MiB. 55.7 seconds to 6.3 seconds, with peak memory usage dropping from 10.7 GiB to
64.8 MiB. [#i615]_
* `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom * `Operon <https://networkgenomics.com/operon/>`_ no longer requires a custom
library installation, both Ansible and Operon are supported by a single library installation, both Ansible and Operon are supported by a single
@ -80,15 +97,14 @@ Mitogen for Ansible
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
* :gh:issue:`363`: fix an obscure race matching *Permission denied* errors from * :gh:issue:`363`: fix an obscure race matching *Permission denied* errors from
some versions of ``su`` running on heavily loaded machines. some versions of :linux:man1:`su` running on heavily loaded machines.
* :gh:issue:`410`: Uses of :linux:man7:`unix` sockets are replaced with * :gh:issue:`410`: Uses of :linux:man7:`unix` sockets are replaced with
traditional :linux:man7:`pipe` pairs when SELinux is detected, to work around traditional :linux:man7:`pipe` pairs when SELinux is detected, to work around
a broken heuristic in common SELinux policies that prevents inheriting a broken heuristic in common SELinux policies that prevents inheriting
:linux:man7:`unix` sockets across privilege domains. :linux:man7:`unix` sockets across privilege domains.
* `#467 <httpe://github.com/dw/mitogen/issues/467>`_: an incompatibility * :gh:issue:`467`: an incompatibility running Mitogen under `Molecule
running Mitogen under `Molecule
<https://molecule.readthedocs.io/en/stable/>`_ was resolved. <https://molecule.readthedocs.io/en/stable/>`_ was resolved.
* :gh:issue:`547`, :gh:issue:`598`: fix a deadlock during initialization of * :gh:issue:`547`, :gh:issue:`598`: fix a deadlock during initialization of
@ -130,7 +146,7 @@ Mitogen for Ansible
encoding. encoding.
* :gh:issue:`602`: connection configuration is more accurately inferred for * :gh:issue:`602`: connection configuration is more accurately inferred for
:ans:mod:`meta: reset_connection <meta>` the :ans:mod:`synchronize`, and for :ans:mod:`meta: reset_connection <meta>`, the :ans:mod:`synchronize`, and for
any action plug-ins that establish additional connections. any action plug-ins that establish additional connections.
* :gh:issue:`598`, :gh:issue:`605`: fix a deadlock managing a shared counter * :gh:issue:`598`, :gh:issue:`605`: fix a deadlock managing a shared counter
@ -138,15 +154,15 @@ Mitogen for Ansible
* :gh:issue:`615`: streaming is implemented for the :ans:mod:`fetch` and other * :gh:issue:`615`: streaming is implemented for the :ans:mod:`fetch` and other
actions that transfer files from targets to the controller. Previously files actions that transfer files from targets to the controller. Previously files
delivered were sent in one message, requiring them to fit in RAM and be were sent in one message, requiring them to fit in RAM and be smaller than an
smaller than an internal message size sanity check. Transfers from controller internal message size sanity check. Transfers from controller to targets have
to targets have been streaming since 0.2.0. been streaming since 0.2.0.
* :gh:commit:`7ae926b3`: the :ans:mod:`lineinfile` leaks writable temporary * :gh:commit:`7ae926b3`: the :ans:mod:`lineinfile` leaked writable temporary
file descriptors since Ansible 2.7.0. When :ans:mod:`~lineinfile` created or file descriptors between Ansible 2.7.0 and 2.8.2. When :ans:mod:`~lineinfile`
modified a script, and that script was later executed, the execution could created or modified a script, and that script was later executed, the
fail with "*text file busy*". Temporary descriptors are now tracked and execution could fail with "*text file busy*". Temporary descriptors are now
cleaned up on exit for all modules. tracked and cleaned up on exit for all modules.
Core Library Core Library
@ -256,7 +272,7 @@ Core Library
unidirectional routing, where contexts may optionally only communicate with unidirectional routing, where contexts may optionally only communicate with
parents and never siblings (so that air-gapped networks cannot be parents and never siblings (so that air-gapped networks cannot be
unintentionally bridged) was not inherited when a child was initiated unintentionally bridged) was not inherited when a child was initiated
directly from an another child. This did not effect Ansible, since the directly from another child. This did not effect Ansible, since the
controller initiates any new child used for routing, only forked tasks are controller initiates any new child used for routing, only forked tasks are
initiated by children. initiated by children.
@ -266,7 +282,7 @@ Thanks!
Mitogen would not be possible without the support of users. A huge thanks for Mitogen would not be possible without the support of users. A huge thanks for
bug reports, testing, features and fixes in this release contributed by bug reports, testing, features and fixes in this release contributed by
`Andreas Hubert <https://github.com/peshay>`_. `Andreas Hubert <https://github.com/peshay>`_,
`Anton Markelov <https://github.com/strangeman>`_, `Anton Markelov <https://github.com/strangeman>`_,
`Dan <https://github.com/dsgnr>`_, `Dan <https://github.com/dsgnr>`_,
`Dave Cottlehuber <https://github.com/dch>`_, `Dave Cottlehuber <https://github.com/dch>`_,
@ -296,6 +312,13 @@ bug reports, testing, features and fixes in this release contributed by
`@tho86 <https://github.com/tho86>`_. `@tho86 <https://github.com/tho86>`_.
.. rubric:: Footnotes
.. [#i615] Peak RSS of controller and target as measured with ``/usr/bin/time
-v ansible-playbook -c local`` using the reproduction supplied in
:gh:issue:`615`.
v0.2.7 (2019-05-19) v0.2.7 (2019-05-19)
------------------- -------------------

@ -334,6 +334,14 @@ These signals are used internally by Mitogen.
- Fired when :class:`mitogen.parent.Reaper` detects subprocess has fully - Fired when :class:`mitogen.parent.Reaper` detects subprocess has fully
exitted. exitted.
* - :py:class:`mitogen.core.Broker`
- ``shutdown``
- Fired after Broker.shutdown() is called, but before ``shutdown`` event
fires. This can be used to trigger any behaviour that relies on the
process remaining intact, as processing of ``shutdown`` races with any
parent sending the child a signal because it is not shutting down in
reasonable time.
* - :py:class:`mitogen.core.Broker` * - :py:class:`mitogen.core.Broker`
- ``shutdown`` - ``shutdown``
- Fired after Broker.shutdown() is called. - Fired after Broker.shutdown() is called.

@ -35,7 +35,7 @@ be expected. On the slave, it is built dynamically during startup.
#: Library version as a tuple. #: Library version as a tuple.
__version__ = (0, 2, 8) __version__ = (0, 2, 9)
#: This is :data:`False` in slave contexts. Previously it was used to prevent #: This is :data:`False` in slave contexts. Previously it was used to prevent

@ -503,7 +503,7 @@ def set_cloexec(fd):
:func:`mitogen.fork.on_fork`. :func:`mitogen.fork.on_fork`.
""" """
flags = fcntl.fcntl(fd, fcntl.F_GETFD) flags = fcntl.fcntl(fd, fcntl.F_GETFD)
assert fd > 2 assert fd > 2, 'fd %r <= 2' % (fd,)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
@ -808,7 +808,7 @@ class Message(object):
self.src_id = mitogen.context_id self.src_id = mitogen.context_id
self.auth_id = mitogen.context_id self.auth_id = mitogen.context_id
vars(self).update(kwargs) vars(self).update(kwargs)
assert isinstance(self.data, BytesType) assert isinstance(self.data, BytesType), 'Message data is not Bytes'
def pack(self): def pack(self):
return ( return (
@ -1834,7 +1834,8 @@ class DelimitedProtocol(Protocol):
if cont: if cont:
self.on_partial_line_received(self._trailer) self.on_partial_line_received(self._trailer)
else: else:
assert stream.protocol is not self assert stream.protocol is not self, \
'stream protocol is no longer %r' % (self,)
stream.protocol.on_receive(broker, self._trailer) stream.protocol.on_receive(broker, self._trailer)
def on_line_received(self, line): def on_line_received(self, line):
@ -2046,6 +2047,10 @@ class MitogenProtocol(Protocol):
#: :data:`mitogen.parent_ids`. #: :data:`mitogen.parent_ids`.
is_privileged = False is_privileged = False
#: Invoked as `on_message(stream, msg)` each message received from the
#: peer.
on_message = None
def __init__(self, router, remote_id, auth_id=None, def __init__(self, router, remote_id, auth_id=None,
local_id=None, parent_ids=None): local_id=None, parent_ids=None):
self._router = router self._router = router
@ -2245,12 +2250,12 @@ class Context(object):
return receiver return receiver
def call_service_async(self, service_name, method_name, **kwargs): def call_service_async(self, service_name, method_name, **kwargs):
_v and LOG.debug('calling service %s.%s of %r, args: %r',
service_name, method_name, self, kwargs)
if isinstance(service_name, BytesType): if isinstance(service_name, BytesType):
service_name = service_name.encode('utf-8') service_name = service_name.encode('utf-8')
elif not isinstance(service_name, UnicodeType): elif not isinstance(service_name, UnicodeType):
service_name = service_name.name() # Service.name() service_name = service_name.name() # Service.name()
_v and LOG.debug('calling service %s.%s of %r, args: %r',
service_name, method_name, self, kwargs)
tup = (service_name, to_text(method_name), Kwargs(kwargs)) tup = (service_name, to_text(method_name), Kwargs(kwargs))
msg = Message.pickled(tup, handle=CALL_SERVICE) msg = Message.pickled(tup, handle=CALL_SERVICE)
return self.send_async(msg) return self.send_async(msg)
@ -2575,6 +2580,7 @@ class Latch(object):
return self._cls_idle_socketpairs.pop() # pop() must be atomic return self._cls_idle_socketpairs.pop() # pop() must be atomic
except IndexError: except IndexError:
rsock, wsock = socket.socketpair() rsock, wsock = socket.socketpair()
rsock.setblocking(False)
set_cloexec(rsock.fileno()) set_cloexec(rsock.fileno())
set_cloexec(wsock.fileno()) set_cloexec(wsock.fileno())
self._cls_all_sockets.extend((rsock, wsock)) self._cls_all_sockets.extend((rsock, wsock))
@ -2649,9 +2655,8 @@ class Latch(object):
) )
e = None e = None
woken = None
try: try:
woken = list(poller.poll(timeout)) list(poller.poll(timeout))
except Exception: except Exception:
e = sys.exc_info()[1] e = sys.exc_info()[1]
@ -2659,11 +2664,19 @@ class Latch(object):
try: try:
i = self._sleeping.index((wsock, cookie)) i = self._sleeping.index((wsock, cookie))
del self._sleeping[i] del self._sleeping[i]
if not woken:
raise e or TimeoutError()
try:
got_cookie = rsock.recv(self.COOKIE_SIZE) got_cookie = rsock.recv(self.COOKIE_SIZE)
except socket.error:
e2 = sys.exc_info()[1]
if e2.args[0] == errno.EAGAIN:
e = TimeoutError()
else:
e = e2
self._cls_idle_socketpairs.append((rsock, wsock)) self._cls_idle_socketpairs.append((rsock, wsock))
if e:
raise e
assert cookie == got_cookie, ( assert cookie == got_cookie, (
"Cookie incorrect; got %r, expected %r" \ "Cookie incorrect; got %r, expected %r" \
@ -2744,8 +2757,7 @@ class Waker(Protocol):
def __init__(self, broker): def __init__(self, broker):
self._broker = broker self._broker = broker
self._lock = threading.Lock() self._deferred = collections.deque()
self._deferred = []
def __repr__(self): def __repr__(self):
return 'Waker(fd=%r/%r)' % ( return 'Waker(fd=%r/%r)' % (
@ -2758,11 +2770,7 @@ class Waker(Protocol):
""" """
Prevent immediate Broker shutdown while deferred functions remain. Prevent immediate Broker shutdown while deferred functions remain.
""" """
self._lock.acquire()
try:
return len(self._deferred) return len(self._deferred)
finally:
self._lock.release()
def on_receive(self, broker, buf): def on_receive(self, broker, buf):
""" """
@ -2771,14 +2779,12 @@ class Waker(Protocol):
ensure only one byte needs to be pending regardless of queue length. ensure only one byte needs to be pending regardless of queue length.
""" """
_vv and IOLOG.debug('%r.on_receive()', self) _vv and IOLOG.debug('%r.on_receive()', self)
self._lock.acquire() while True:
try: try:
deferred = self._deferred func, args, kwargs = self._deferred.popleft()
self._deferred = [] except IndexError:
finally: return
self._lock.release()
for func, args, kwargs in deferred:
try: try:
func(*args, **kwargs) func(*args, **kwargs)
except Exception: except Exception:
@ -2795,7 +2801,7 @@ class Waker(Protocol):
self.stream.transmit_side.write(b(' ')) self.stream.transmit_side.write(b(' '))
except OSError: except OSError:
e = sys.exc_info()[1] e = sys.exc_info()[1]
if e.args[0] != errno.EBADF: if e.args[0] in (errno.EBADF, errno.EWOULDBLOCK):
raise raise
broker_shutdown_msg = ( broker_shutdown_msg = (
@ -2821,14 +2827,7 @@ class Waker(Protocol):
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, _vv and IOLOG.debug('%r.defer() [fd=%r]', self,
self.stream.transmit_side.fd) self.stream.transmit_side.fd)
self._lock.acquire()
try:
should_wake = not self._deferred
self._deferred.append((func, args, kwargs)) self._deferred.append((func, args, kwargs))
finally:
self._lock.release()
if should_wake:
self._wake() self._wake()
@ -3299,6 +3298,8 @@ class Router(object):
# the parent. # the parent.
if in_stream.protocol.auth_id is not None: if in_stream.protocol.auth_id is not None:
msg.auth_id = in_stream.protocol.auth_id msg.auth_id = in_stream.protocol.auth_id
if in_stream.protocol.on_message is not None:
in_stream.protocol.on_message(in_stream, msg)
# Record the IDs the source ever communicated with. # Record the IDs the source ever communicated with.
in_stream.protocol.egress_ids.add(msg.dst_id) in_stream.protocol.egress_ids.add(msg.dst_id)
@ -3548,6 +3549,7 @@ class Broker(object):
while self._alive: while self._alive:
self._loop_once() self._loop_once()
fire(self, 'before_shutdown')
fire(self, 'shutdown') fire(self, 'shutdown')
self._broker_shutdown() self._broker_shutdown()
except Exception: except Exception:
@ -3625,7 +3627,13 @@ class Dispatcher(object):
policy=has_parent_authority, policy=has_parent_authority,
) )
self._service_recv.notify = self._on_call_service self._service_recv.notify = self._on_call_service
listen(econtext.broker, 'shutdown', self.recv.close) listen(econtext.broker, 'shutdown', self._on_broker_shutdown)
def _on_broker_shutdown(self):
if self._service_recv.notify == self._on_call_service:
self._service_recv.notify = None
self.recv.close()
@classmethod @classmethod
@takes_econtext @takes_econtext
@ -3987,4 +3995,3 @@ class ExternalContext(object):
raise raise
finally: finally:
self.broker.shutdown() self.broker.shutdown()
self.broker.join()

@ -79,16 +79,8 @@ def reset_logging_framework():
logging._lock = threading.RLock() logging._lock = threading.RLock()
# The root logger does not appear in the loggerDict. # The root logger does not appear in the loggerDict.
for name in [None] + list(logging.Logger.manager.loggerDict): logging.Logger.manager.loggerDict = {}
for handler in logging.getLogger(name).handlers: logging.getLogger().handlers = []
handler.createLock()
root = logging.getLogger()
root.handlers = [
handler
for handler in root.handlers
if not isinstance(handler, mitogen.core.LogHandler)
]
def on_fork(): def on_fork():
@ -245,6 +237,8 @@ class Connection(mitogen.parent.Connection):
if childfp.fileno() not in (0, 1, 100): if childfp.fileno() not in (0, 1, 100):
childfp.close() childfp.close()
mitogen.core.IOLOG.setLevel(logging.INFO)
try: try:
try: try:
mitogen.core.ExternalContext(self.get_econtext_config()).main() mitogen.core.ExternalContext(self.get_econtext_config()).main()

@ -1680,6 +1680,7 @@ class Connection(object):
try: try:
self.proc = self.start_child() self.proc = self.start_child()
except Exception: except Exception:
LOG.debug('failed to start child', exc_info=True)
self._fail_connection(sys.exc_info()[1]) self._fail_connection(sys.exc_info()[1])
return return
@ -2230,7 +2231,7 @@ class RouteMonitor(object):
target_name = target_name.decode() target_name = target_name.decode()
target_id = int(target_id_s) target_id = int(target_id_s)
self.router.context_by_id(target_id).name = target_name self.router.context_by_id(target_id).name = target_name
stream = self.router.stream_by_id(msg.auth_id) stream = self.router.stream_by_id(msg.src_id)
current = self.router.stream_by_id(target_id) current = self.router.stream_by_id(target_id)
if current and current.protocol.remote_id != mitogen.parent_id: if current and current.protocol.remote_id != mitogen.parent_id:
self._log.error('Cannot add duplicate route to %r via %r, ' self._log.error('Cannot add duplicate route to %r via %r, '
@ -2258,7 +2259,7 @@ class RouteMonitor(object):
if registered_stream is None: if registered_stream is None:
return return
stream = self.router.stream_by_id(msg.auth_id) stream = self.router.stream_by_id(msg.src_id)
if registered_stream != stream: if registered_stream != stream:
self._log.error('received DEL_ROUTE for %d from %r, expected %r', self._log.error('received DEL_ROUTE for %d from %r, expected %r',
target_id, stream, registered_stream) target_id, stream, registered_stream)

@ -324,13 +324,13 @@ class Select(object):
if not self._receivers: if not self._receivers:
raise Error(self.empty_msg) raise Error(self.empty_msg)
event = Event()
while True: while True:
recv = self._latch.get(timeout=timeout, block=block) recv = self._latch.get(timeout=timeout, block=block)
try: try:
if isinstance(recv, Select): if isinstance(recv, Select):
event = recv.get_event(block=False) event = recv.get_event(block=False)
else: else:
event = Event()
event.source = recv event.source = recv
event.data = recv.get(block=False) event.data = recv.get(block=False)
if self._oneshot: if self._oneshot:

@ -1,3 +1,4 @@
lib/modules/custom_binary_producing_junk lib/modules/custom_binary_producing_junk
lib/modules/custom_binary_producing_json lib/modules/custom_binary_producing_json
hosts/*.local hosts/*.local
gcloud

@ -2,6 +2,7 @@
inventory = hosts inventory = hosts
gathering = explicit gathering = explicit
strategy_plugins = ../../ansible_mitogen/plugins/strategy strategy_plugins = ../../ansible_mitogen/plugins/strategy
inventory_plugins = lib/inventory
action_plugins = lib/action action_plugins = lib/action
callback_plugins = lib/callback callback_plugins = lib/callback
stdout_callback = nice_stdout stdout_callback = nice_stdout

@ -1,2 +0,0 @@
terraform.tfstate*
.terraform

@ -1,3 +0,0 @@
default:
terraform fmt

@ -1,6 +0,0 @@
# Command line.
````
time LANG=C LC_ALL=C ANSIBLE_STRATEGY=mitogen MITOGEN_GCLOUD_GROUP=debops_all_hosts debops common
```

@ -1,8 +0,0 @@
[defaults]
strategy_plugins = ../../../ansible_mitogen/plugins/strategy
strategy = mitogen
inventory = hosts
retry_files_enabled = False
host_key_checking = False
callback_plugins = ../lib/callback
stdout_callback = nice_stdout

@ -1,159 +0,0 @@
- hosts: all
become: true
tasks:
- apt: name={{item}} state=installed
with_items:
- openvpn
- tcpdump
- python-pip
- python-virtualenv
- strace
- libldap2-dev
- linux-perf
- libsasl2-dev
- build-essential
- git
- rsync
- file:
path: /etc/openvpn
state: directory
- copy:
dest: /etc/openvpn/secret
mode: '0600'
content: |
-----BEGIN OpenVPN Static key V1-----
f94005e4206828e281eb397aefd69b37
ebe6cd39057d5641c5d8dd539cd07651
557d94d0077852bd8f92b68bef927169
c5f0e42ac962a2cbbed35e107ffa0e71
1a2607c6bcd919ec5846917b20eb6684
c7505152815d6ed7b4420714777a3d4a
8edb27ca81971cba7a1e88fe3936e13b
85e9be6706a30cd1334836ed0f08e899
78942329a330392dff42e4570731ac24
9330358aaa6828c07ecb41fb9c498a89
1e0435c5a45bfed390cd2104073634ef
b00f9fae1d3c49ef5de51854103edac9
5ff39c9dfc66ae270510b2ffa74d87d2
9d4b3844b1e1473237bc6dc78fb03e2e
643ce58e667a532efceec7177367fb37
a16379a51e0a8c8e3ec00a59952b79d4
-----END OpenVPN Static key V1-----
- copy:
dest: /etc/openvpn/k3.conf
content: |
remote k3.botanicus.net
dev tun
ifconfig 10.18.0.1 10.18.0.2
secret secret
- shell: systemctl enable openvpn@k3.service
- shell: systemctl start openvpn@k3.service
- lineinfile:
line: "{{item}}"
path: /etc/sysctl.conf
register: sysctl_conf
with_items:
- "net.ipv4.ip_forward=1"
- "kernel.perf_event_paranoid=-1"
- shell: /sbin/sysctl -p
when: sysctl_conf.changed
- copy:
dest: /etc/rc.local
mode: "0744"
content: |
#!/bin/bash
iptables -t nat -F;
iptables -t nat -X;
iptables -t nat -A POSTROUTING -j MASQUERADE;
- shell: systemctl daemon-reload
- shell: systemctl enable rc-local
- shell: systemctl start rc-local
- hosts: all
vars:
git_username: '{{ lookup("pipe", "git config --global user.name") }}'
git_email: '{{ lookup("pipe", "git config --global user.email") }}'
tasks:
- copy:
src: ~/.ssh/id_gitlab
dest: ~/.ssh/id_gitlab
mode: 0600
- template:
dest: ~/.ssh/config
src: ssh_config.j2
- shell: "rsync -a ~/.ssh {{inventory_hostname}}:"
connection: local
- shell: |
git config --global user.email "{{git_username}}"
git config --global user.name "{{git_email}}"
name: set_git_config
- git:
dest: ~/mitogen
repo: https://github.com/dw/mitogen.git
version: dmw
- git:
dest: ~/ansible
repo: https://github.com/ansible/ansible.git
#version: dmw
- pip:
virtualenv: ~/venv
requirements: ~/mitogen/dev_requirements.txt
- pip:
virtualenv: ~/venv
editable: true
name: ~/mitogen
- pip:
virtualenv: ~/venv
editable: true
name: ~/ansible
- pip:
virtualenv: ~/venv
name: debops
- lineinfile:
line: "source $HOME/venv/bin/activate"
path: ~/.profile
- name: debops-init
shell: ~/venv/bin/debops-init ~/prj
args:
creates: ~/prj
- name: grpvars
copy:
dest: "{{ansible_user_dir}}/prj/ansible/inventory/group_vars/all/dhparam.yml"
content: |
---
dhparam__bits: [ '256' ]
- blockinfile:
path: ~/prj/.debops.cfg
insertafter: '\[ansible defaults\]'
block: |
strategy_plugins = {{ansible_user_dir}}/mitogen/ansible_mitogen/plugins/strategy
forks = 50
host_key_checking = False
- file:
path: ~/prj/ansible/inventory/gcloud.py
state: link
src: ~/mitogen/tests/ansible/lib/inventory/gcloud.py

@ -1,2 +0,0 @@
[controller]
c

@ -1,149 +0,0 @@
variable "node-count" {
default = 0
}
variable "preemptible" {
default = true
}
variable "big" {
default = false
}
provider "google" {
project = "mitogen-load-testing"
region = "europe-west1"
zone = "europe-west1-d"
}
resource "google_compute_instance" "controller" {
name = "ansible-controller"
machine_type = "${var.big ? "n1-highcpu-32" : "custom-1-1024"}"
allow_stopping_for_update = true
can_ip_forward = true
boot_disk {
initialize_params {
image = "debian-cloud/debian-9"
}
}
scheduling {
preemptible = true
automatic_restart = false
}
network_interface {
subnetwork = "${google_compute_subnetwork.loadtest-subnet.self_link}"
access_config = {}
}
provisioner "local-exec" {
command = <<-EOF
ip=${google_compute_instance.controller.network_interface.0.access_config.0.nat_ip};
ssh-keygen -R $ip;
ssh-keyscan $ip >> ~/.ssh/known_hosts;
sed -ri -e "s/.*CONTROLLER_IP_HERE.*/ Hostname $ip/" ~/.ssh/config;
ansible-playbook -i $ip, controller.yml
EOF
}
}
resource "google_compute_network" "loadtest" {
name = "loadtest"
auto_create_subnetworks = false
}
resource "google_compute_subnetwork" "loadtest-subnet" {
name = "loadtest-subnet"
ip_cidr_range = "10.19.0.0/16"
network = "${google_compute_network.loadtest.id}"
}
resource "google_compute_firewall" "allow-all-in" {
name = "allow-all-in"
network = "${google_compute_network.loadtest.name}"
direction = "INGRESS"
allow {
protocol = "all"
}
}
resource "google_compute_firewall" "allow-all-out" {
name = "allow-all-out"
network = "${google_compute_network.loadtest.name}"
direction = "EGRESS"
allow {
protocol = "all"
}
}
resource "google_compute_route" "route-nodes-via-controller" {
name = "route-nodes-via-controller"
dest_range = "0.0.0.0/0"
network = "${google_compute_network.loadtest.name}"
next_hop_instance = "${google_compute_instance.controller.self_link}"
next_hop_instance_zone = "${google_compute_instance.controller.zone}"
priority = 800
tags = ["node"]
}
resource "google_compute_instance_template" "node" {
name = "node"
tags = ["node"]
machine_type = "custom-1-1024"
scheduling {
preemptible = "${var.preemptible}"
automatic_restart = false
}
disk {
source_image = "debian-cloud/debian-9"
auto_delete = true
boot = true
}
network_interface {
subnetwork = "${google_compute_subnetwork.loadtest-subnet.self_link}"
}
}
#
# Compute Engine tops out at 1000 VMs per group
#
resource "google_compute_instance_group_manager" "nodes-a" {
name = "nodes-a"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}
resource "google_compute_instance_group_manager" "nodes-b" {
name = "nodes-b"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}
resource "google_compute_instance_group_manager" "nodes-c" {
name = "nodes-c"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}
resource "google_compute_instance_group_manager" "nodes-d" {
name = "nodes-d"
base_instance_name = "node"
instance_template = "${google_compute_instance_template.node.self_link}"
target_size = "${var.node-count / 4}"
}

@ -1 +0,0 @@
google-api-python-client==1.6.5

@ -1,19 +0,0 @@
[defaults]
inventory = hosts,~/mitogen/tests/ansible/lib/inventory
gathering = explicit
strategy_plugins = ~/mitogen/ansible_mitogen/plugins/strategy
action_plugins = ~/mitogen/tests/ansible/lib/action
callback_plugins = ~/mitogen/tests/ansible/lib/callback
stdout_callback = nice_stdout
vars_plugins = ~/mitogen/tests/ansible/lib/vars
library = ~/mitogen/tests/ansible/lib/modules
retry_files_enabled = False
forks = 50
strategy = mitogen_linear
host_key_checking = False
[ssh_connection]
ssh_args = -o ForwardAgent=yes -o ControlMaster=auto -o ControlPersist=60s
pipelining = True

@ -1,6 +0,0 @@
Host localhost-*
Hostname localhost
Host gitlab.com
IdentityFile ~/.ssh/id_gitlab

@ -8,3 +8,4 @@
- include: put_large_file.yml - include: put_large_file.yml
- include: put_small_file.yml - include: put_small_file.yml
- include: reset.yml - include: reset.yml
- include: reset_become.yml

@ -0,0 +1,48 @@
# issue #633: Connection.reset() should ignore "become", and apply to the login
# account.
- hosts: test-targets
become: true
gather_facts: false
tasks:
- debug: msg="reset_become.yml skipped on Ansible<2.5.6"
when: ansible_version.full < '2.5.6'
- meta: end_play
when: ansible_version.full < '2.5.6'
- name: save pid of the become acct
custom_python_detect_environment:
register: become_acct
- name: save pid of the login acct
become: false
custom_python_detect_environment:
register: login_acct
- name: ensure login != become
assert:
that:
- become_acct.pid != login_acct.pid
- name: reset the connection
meta: reset_connection
- name: save new pid of the become acct
custom_python_detect_environment:
register: new_become_acct
- name: ensure become_acct != new_become_acct
assert:
that:
- become_acct.pid != new_become_acct.pid
- name: save new pid of login acct
become: false
custom_python_detect_environment:
register: new_login_acct
- name: ensure login_acct != new_login_acct
assert:
that:
- login_acct.pid != new_login_acct.pid

@ -1,6 +1,6 @@
- name: integration/runner/missing_module.yml - name: integration/runner/missing_module.yml
hosts: test-targets hosts: test-targets[0]
connection: local connection: local
tasks: tasks:
- connection: local - connection: local

@ -1,6 +1,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os
import io import io
import os
import sys
from ansible import constants as C from ansible import constants as C
from ansible.module_utils import six from ansible.module_utils import six
@ -15,6 +16,27 @@ try:
except KeyError: except KeyError:
pprint = None pprint = None
DefaultModule = callback_loader.get('default', class_only=True)
DOCUMENTATION = '''
callback: nice_stdout
type: stdout
options:
check_mode_markers:
name: Show markers when running in check mode
description:
- "Toggle to control displaying markers when running in check mode. The markers are C(DRY RUN)
at the beggining and ending of playbook execution (when calling C(ansible-playbook --check))
and C(CHECK MODE) as a suffix at every play and task that is run in check mode."
type: bool
default: no
version_added: 2.9
env:
- name: ANSIBLE_CHECK_MODE_MARKERS
ini:
- key: check_mode_markers
section: defaults
'''
def printi(tio, obj, key=None, indent=0): def printi(tio, obj, key=None, indent=0):
def write(s, *args): def write(s, *args):
@ -51,8 +73,6 @@ def printi(tio, obj, key=None, indent=0):
write('%r', obj) write('%r', obj)
DefaultModule = callback_loader.get('default', class_only=True)
class CallbackModule(DefaultModule): class CallbackModule(DefaultModule):
def _dump_results(self, result, *args, **kwargs): def _dump_results(self, result, *args, **kwargs):
try: try:

@ -1,49 +0,0 @@
#!/usr/bin/env python
import json
import os
import sys
if (not os.environ.get('MITOGEN_GCLOUD_GROUP')) or any('--host' in s for s in sys.argv):
sys.stdout.write('{}')
sys.exit(0)
import googleapiclient.discovery
def main():
project = 'mitogen-load-testing'
zone = 'europe-west1-d'
prefix = 'node-'
client = googleapiclient.discovery.build('compute', 'v1')
resp = client.instances().list(project=project, zone=zone).execute()
ips = []
for inst in resp['items']:
if inst['status'] == 'RUNNING' and inst['name'].startswith(prefix):
ips.extend(
#bytes(config['natIP'])
bytes(interface['networkIP'])
for interface in inst['networkInterfaces']
#for config in interface['accessConfigs']
)
sys.stderr.write('Addresses: %s\n' % (ips,))
gname = os.environ['MITOGEN_GCLOUD_GROUP']
groups = {
gname: {
'hosts': ips
}
}
for i in 1, 10, 20, 50, 100:
groups['%s-%s' % (gname, i)] = {
'hosts': ips[:i]
}
sys.stdout.write(json.dumps(groups, indent=4))
if __name__ == '__main__':
main()

@ -6,6 +6,8 @@
any_errors_fatal: true any_errors_fatal: true
gather_facts: true gather_facts: true
tasks: tasks:
- meta: end_play
when: ansible_version.full < '2.6'
# Copy the naughty 'ansible' into place. # Copy the naughty 'ansible' into place.
- copy: - copy:
@ -13,7 +15,7 @@
src: ansible.py src: ansible.py
# Restart the connection. # Restart the connection.
- mitogen_shutdown_all: - meta: reset_connection
- custom_python_detect_environment: - custom_python_detect_environment:
register: env register: env

@ -46,7 +46,13 @@ class ConnectionMixin(MuxProcessMixin):
def make_connection(self): def make_connection(self):
play_context = ansible.playbook.play_context.PlayContext() play_context = ansible.playbook.play_context.PlayContext()
return self.klass(play_context, new_stdin=False) conn = self.klass(play_context, new_stdin=False)
conn.on_action_run(
task_vars={},
delegate_to_hostname=None,
loader_basedir=None,
)
return conn
def wait_for_completion(self): def wait_for_completion(self):
# put_data() is asynchronous, must wait for operation to happen. Do # put_data() is asynchronous, must wait for operation to happen. Do

@ -1 +0,0 @@
*.tar.bz2 filter=lfs diff=lfs merge=lfs -text

@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:123ddbd9055745d37e8f14bf1c8352541ff4d500e6daa4aa3165e604fb7e8b6a
size 6176131

@ -193,7 +193,7 @@ class CrashTest(testlib.BrokerMixin, testlib.TestCase):
sem = mitogen.core.Latch() sem = mitogen.core.Latch()
router.add_handler(sem.put) router.add_handler(sem.put)
log = testlib.LogCapturer('mitogen') log = testlib.LogCapturer()
log.start() log.start()
# Force a crash and ensure it wakes up. # Force a crash and ensure it wakes up.

@ -54,6 +54,15 @@ if faulthandler is not None:
faulthandler.enable() faulthandler.enable()
#
# Temporary hack: Operon changed logging somewhat, and this broke LogCapturer /
# log_handler_test.
#
mitogen.core.LOG.propagate = True
def get_fd_count(): def get_fd_count():
""" """
Return the number of FDs open by this process. Return the number of FDs open by this process.

Loading…
Cancel
Save