Merge branch 'master' into eye-of-the-token-its-the-thrill-of-the-light

pull/178/head
Alex Willmer 6 years ago
commit dc3f5730a2

8
.gitignore vendored

@ -1,7 +1,13 @@
.coverage
.tox
.venv
**/.DS_Store
*.pyc
*.pyd
*.pyo
MANIFEST
build/
dist/
docs/_build
htmlcov/
*.egg-info
__pycache__/

@ -4,15 +4,22 @@ notifications:
email: false
language: python
cache: pip
python:
- "2.7"
env:
- MODE=mitogen
- MODE=debops
- MODE=ansible ANSIBLE_VERSION=2.4.3.0
- MODE=ansible ANSIBLE_VERSION=2.5.0
install:
- pip install -r dev_requirements.txt
script:
- PYTHONPATH=. ${TRAVIS_BUILD_DIR}/test.sh
- ${TRAVIS_BUILD_DIR}/.travis/${MODE}_tests.sh
services:
- docker

@ -0,0 +1,45 @@
#!/bin/bash -ex
# Run tests/ansible/integration/all.yml under Ansible and Ansible-Mitogen
TRAVIS_BUILD_DIR="${TRAVIS_BUILD_DIR:-`pwd`}"
TMPDIR="/tmp/ansible-tests-$$"
ANSIBLE_VERSION="${ANSIBLE_VERSION:-2.4.3.0}"
function on_exit()
{
rm -rf "$TMPDIR"
docker kill target || true
}
trap on_exit EXIT
mkdir "$TMPDIR"
echo travis_fold:start:docker_setup
docker run --rm --detach --name=target d2mw/mitogen-test /bin/sleep 86400
echo travis_fold:end:docker_setup
echo travis_fold:start:job_setup
pip install -U ansible=="${ANSIBLE_VERSION}"
cd ${TRAVIS_BUILD_DIR}/tests/ansible
cat >> ${TMPDIR}/hosts <<-EOF
localhost
target ansible_connection=docker ansible_python_interpreter=/usr/bin/python2.7
EOF
echo travis_fold:end:job_setup
echo travis_fold:start:mitogen_linear
ANSIBLE_STRATEGY=mitogen_linear /usr/bin/time ansible-playbook \
integration/all.yml \
-i "${TMPDIR}/hosts"
echo travis_fold:end:mitogen_linear
echo travis_fold:start:vanilla_ansible
/usr/bin/time ansible-playbook \
integration/all.yml \
-i "${TMPDIR}/hosts"
echo travis_fold:end:vanilla_ansible

@ -0,0 +1,77 @@
#!/bin/bash -ex
# Run some invocations of DebOps.
TMPDIR="/tmp/debops-$$"
TRAVIS_BUILD_DIR="${TRAVIS_BUILD_DIR:-`pwd`}"
TARGET_COUNT="${TARGET_COUNT:-4}"
function on_exit()
{
echo travis_fold:start:cleanup
[ "$KEEP" ] || {
rm -rf "$TMPDIR" || true
for i in $(seq $TARGET_COUNT)
do
docker kill target$i || true
done
}
echo travis_fold:end:cleanup
}
trap on_exit EXIT
mkdir "$TMPDIR"
echo travis_fold:start:job_setup
pip install -qqqU debops==0.7.2 ansible==2.4.3.0
debops-init "$TMPDIR/project"
cd "$TMPDIR/project"
cat > .debops.cfg <<-EOF
[ansible defaults]
strategy_plugins = ${TRAVIS_BUILD_DIR}/ansible_mitogen/plugins/strategy
strategy = mitogen_linear
EOF
cat > ansible/inventory/group_vars/debops_all_hosts.yml <<-EOF
ansible_python_interpreter: /usr/bin/python2.7
ansible_user: has-sudo-pubkey
ansible_become_pass: y
ansible_ssh_private_key_file: ${TRAVIS_BUILD_DIR}/tests/data/docker/has-sudo-pubkey.key
# Speed up slow DH generation.
dhparam__bits: ["128", "64"]
EOF
DOCKER_HOSTNAME="$(python ${TRAVIS_BUILD_DIR}/tests/show_docker_hostname.py)"
for i in $(seq $TARGET_COUNT)
do
port=$((2200 + $i))
docker run \
--rm \
--detach \
--publish 0.0.0.0:$port:22/tcp \
--name=target$i \
d2mw/mitogen-test
echo \
target$i \
ansible_host=$DOCKER_HOSTNAME \
ansible_port=$port \
>> ansible/inventory/hosts
done
echo travis_fold:end:job_setup
echo travis_fold:start:first_run
/usr/bin/time debops common
echo travis_fold:end:first_run
echo travis_fold:start:second_run
/usr/bin/time debops common
echo travis_fold:end:second_run

@ -0,0 +1,4 @@
#!/bin/bash -ex
# Run the Mitogen tests.
MITOGEN_LOG_LEVEL=debug PYTHONPATH=. ${TRAVIS_BUILD_DIR}/run_tests

@ -37,9 +37,9 @@ import ansible.errors
import ansible.plugins.connection
import mitogen.unix
from mitogen.utils import cast
import mitogen.utils
import ansible_mitogen.helpers
import ansible_mitogen.target
import ansible_mitogen.process
from ansible_mitogen.services import ContextService
@ -82,6 +82,9 @@ class Connection(ansible.plugins.connection.ConnectionBase):
#: Set to 'mitogen_ssh_discriminator' by on_action_run()
mitogen_ssh_discriminator = None
#: Set after connection to the target context's home directory.
_homedir = None
def __init__(self, play_context, new_stdin, original_transport, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
'The "mitogen" connection plug-in may only be instantiated '
@ -125,63 +128,72 @@ class Connection(ansible.plugins.connection.ConnectionBase):
'sudo'
)
@property
def homedir(self):
self._connect()
return self._homedir
@property
def connected(self):
return self.broker is not None
def _wrap_connect(self, args):
dct = mitogen.service.call(
context=self.parent,
handle=ContextService.handle,
obj=mitogen.utils.cast(args),
)
if dct['msg']:
raise ansible.errors.AnsibleConnectionFailure(dct['msg'])
return dct['context'], dct['home_dir']
def _connect_local(self):
"""
Fetch a reference to the local() Context from ContextService in the
master process.
"""
return mitogen.service.call(self.parent, ContextService.handle, cast({
return self._wrap_connect({
'method': 'local',
'python_path': self.python_path,
}))
})
def _connect_ssh(self):
"""
Fetch a reference to an SSH Context matching the play context from
ContextService in the master process.
"""
return mitogen.service.call(
self.parent,
ContextService.handle,
cast({
'method': 'ssh',
'check_host_keys': False, # TODO
'hostname': self._play_context.remote_addr,
'discriminator': self.mitogen_ssh_discriminator,
'username': self._play_context.remote_user,
'password': self._play_context.password,
'port': self._play_context.port,
'python_path': self.python_path,
'identity_file': self._play_context.private_key_file,
'ssh_path': self._play_context.ssh_executable,
'connect_timeout': self.ansible_ssh_timeout,
'ssh_args': [
term
for s in (
getattr(self._play_context, 'ssh_args', ''),
getattr(self._play_context, 'ssh_common_args', ''),
getattr(self._play_context, 'ssh_extra_args', '')
)
for term in shlex.split(s or '')
]
})
)
return self._wrap_connect({
'method': 'ssh',
'check_host_keys': False, # TODO
'hostname': self._play_context.remote_addr,
'discriminator': self.mitogen_ssh_discriminator,
'username': self._play_context.remote_user,
'password': self._play_context.password,
'port': self._play_context.port,
'python_path': self.python_path,
'identity_file': self._play_context.private_key_file,
'ssh_path': self._play_context.ssh_executable,
'connect_timeout': self.ansible_ssh_timeout,
'ssh_args': [
term
for s in (
getattr(self._play_context, 'ssh_args', ''),
getattr(self._play_context, 'ssh_common_args', ''),
getattr(self._play_context, 'ssh_extra_args', '')
)
for term in shlex.split(s or '')
]
})
def _connect_docker(self):
return mitogen.service.call(
self.parent,
ContextService.handle,
cast({
'method': 'docker',
'container': self._play_context.remote_addr,
'python_path': self.python_path,
'connect_timeout': self._play_context.timeout,
})
)
return self._wrap_connect({
'method': 'docker',
'container': self._play_context.remote_addr,
'python_path': self.python_path,
'connect_timeout': self._play_context.timeout,
})
def _connect_sudo(self, via=None, python_path=None):
"""
@ -192,23 +204,19 @@ class Connection(ansible.plugins.connection.ConnectionBase):
Parent Context of the sudo Context. For Ansible, this should always
be a Context returned by _connect_ssh().
"""
return mitogen.service.call(
self.parent,
ContextService.handle,
cast({
'method': 'sudo',
'username': self._play_context.become_user,
'password': self._play_context.password,
'python_path': python_path or self.python_path,
'sudo_path': self.sudo_path,
'connect_timeout': self._play_context.timeout,
'via': via,
'sudo_args': shlex.split(
self._play_context.sudo_flags or
self._play_context.become_flags or ''
),
})
)
return self._wrap_connect({
'method': 'sudo',
'username': self._play_context.become_user,
'password': self._play_context.become_pass,
'python_path': python_path or self.python_path,
'sudo_path': self.sudo_path,
'connect_timeout': self._play_context.timeout,
'via': via,
'sudo_args': shlex.split(
self._play_context.sudo_flags or
self._play_context.become_flags or ''
),
})
def _connect(self):
"""
@ -232,21 +240,31 @@ class Connection(ansible.plugins.connection.ConnectionBase):
if self.original_transport == 'local':
if self._play_context.become:
self.context = self._connect_sudo(python_path=sys.executable)
self.context, self._homedir = self._connect_sudo(
python_path=sys.executable
)
else:
self.context = self._connect_local()
self.context, self._homedir = self._connect_local()
return
if self.original_transport == 'docker':
self.host = self._connect_docker()
self.host, self._homedir = self._connect_docker()
elif self.original_transport == 'ssh':
self.host = self._connect_ssh()
self.host, self._homedir = self._connect_ssh()
if self._play_context.become:
self.context = self._connect_sudo(via=self.host)
self.context, self._homedir = self._connect_sudo(via=self.host)
else:
self.context = self.host
def get_context_name(self):
"""
Return the name of the target context we issue commands against, i.e. a
unique string useful as a key for related data, such as a list of
modules uploaded to the target.
"""
return self.context.name
def close(self):
"""
Arrange for the mitogen.master.Router running in the worker to
@ -285,10 +303,10 @@ class Connection(ansible.plugins.connection.ConnectionBase):
LOG.debug('Call %s%r took %d ms', func.func_name, args,
1000 * (time.time() - t0))
def exec_command(self, cmd, in_data='', sudoable=True):
def exec_command(self, cmd, in_data='', sudoable=True, mitogen_chdir=None):
"""
Implement exec_command() by calling the corresponding
ansible_mitogen.helpers function in the target.
ansible_mitogen.target function in the target.
:param str cmd:
Shell command to execute.
@ -297,44 +315,57 @@ class Connection(ansible.plugins.connection.ConnectionBase):
:returns:
(return code, stdout bytes, stderr bytes)
"""
return self.call(ansible_mitogen.helpers.exec_command,
cast(cmd), cast(in_data))
emulate_tty = (not in_data and sudoable)
rc, stdout, stderr = self.call(
ansible_mitogen.target.exec_command,
cmd=mitogen.utils.cast(cmd),
in_data=mitogen.utils.cast(in_data),
chdir=mitogen_chdir,
emulate_tty=emulate_tty,
)
stderr += 'Shared connection to %s closed.%s' % (
self._play_context.remote_addr,
('\r\n' if emulate_tty else '\n'),
)
return rc, stdout, stderr
def fetch_file(self, in_path, out_path):
"""
Implement fetch_file() by calling the corresponding
ansible_mitogen.helpers function in the target.
ansible_mitogen.target function in the target.
:param str in_path:
Remote filesystem path to read.
:param str out_path:
Local filesystem path to write.
"""
output = self.call(ansible_mitogen.helpers.read_path,
cast(in_path))
ansible_mitogen.helpers.write_path(out_path, output)
output = self.call(ansible_mitogen.target.read_path,
mitogen.utils.cast(in_path))
ansible_mitogen.target.write_path(out_path, output)
def put_data(self, out_path, data):
"""
Implement put_file() by caling the corresponding
ansible_mitogen.helpers function in the target.
ansible_mitogen.target function in the target.
:param str in_path:
Local filesystem path to read.
:param str out_path:
Remote filesystem path to write.
"""
self.call(ansible_mitogen.helpers.write_path,
cast(out_path), cast(data))
self.call(ansible_mitogen.target.write_path,
mitogen.utils.cast(out_path),
mitogen.utils.cast(data))
def put_file(self, in_path, out_path):
"""
Implement put_file() by caling the corresponding
ansible_mitogen.helpers function in the target.
ansible_mitogen.target function in the target.
:param str in_path:
Local filesystem path to read.
:param str out_path:
Remote filesystem path to write.
"""
self.put_data(out_path, ansible_mitogen.helpers.read_path(in_path))
self.put_data(out_path, ansible_mitogen.target.read_path(in_path))

@ -1,320 +0,0 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import json
import operator
import os
import pwd
import random
import re
import stat
import subprocess
import threading
import mitogen.core
# Prevent accidental import of an Ansible module from hanging on stdin read.
import ansible.module_utils.basic
ansible.module_utils.basic._ANSIBLE_ARGS = '{}'
#: Mapping of job_id<->result dict
_result_by_job_id = {}
#: Mapping of job_id<->threading.Thread
_thread_by_job_id = {}
class Exit(Exception):
"""
Raised when a module exits with success.
"""
def __init__(self, dct):
self.dct = dct
class ModuleError(Exception):
"""
Raised when a module voluntarily indicates failure via .fail_json().
"""
def __init__(self, msg, dct):
Exception.__init__(self, msg)
self.dct = dct
def monkey_exit_json(self, **kwargs):
"""
Replace AnsibleModule.exit_json() with something that doesn't try to kill
the process or JSON-encode the result dictionary. Instead, cause Exit to be
raised, with a `dct` attribute containing the successful result dictionary.
"""
self.add_path_info(kwargs)
kwargs.setdefault('changed', False)
kwargs.setdefault('invocation', {
'module_args': self.params
})
kwargs = ansible.module_utils.basic.remove_values(
kwargs,
self.no_log_values
)
self.do_cleanup_files()
raise Exit(kwargs)
def monkey_fail_json(self, **kwargs):
"""
Replace AnsibleModule.fail_json() with something that raises ModuleError,
which includes a `dct` attribute.
"""
self.add_path_info(kwargs)
kwargs.setdefault('failed', True)
kwargs.setdefault('invocation', {
'module_args': self.params
})
kwargs = ansible.module_utils.basic.remove_values(
kwargs,
self.no_log_values
)
self.do_cleanup_files()
raise ModuleError(kwargs.get('msg'), kwargs)
def module_fixups(mod):
"""
Apply fixups for known problems with mainline Ansible modules.
"""
if mod.__name__ == 'ansible.modules.packaging.os.yum_repository':
# https://github.com/dw/mitogen/issues/154
mod.YumRepo.repofile = mod.configparser.RawConfigParser()
class TemporaryEnvironment(object):
def __init__(self, env=None):
self.original = os.environ.copy()
self.env = env or {}
os.environ.update((k, str(v)) for k, v in self.env.iteritems())
def revert(self):
os.environ.clear()
os.environ.update(self.original)
def run_module(module, raw_params=None, args=None, env=None):
"""
Set up the process environment in preparation for running an Ansible
module. This monkey-patches the Ansible libraries in various places to
prevent it from trying to kill the process on completion, and to prevent it
from reading sys.stdin.
"""
if args is None:
args = {}
if raw_params is not None:
args['_raw_params'] = raw_params
ansible.module_utils.basic.AnsibleModule.exit_json = monkey_exit_json
ansible.module_utils.basic.AnsibleModule.fail_json = monkey_fail_json
ansible.module_utils.basic._ANSIBLE_ARGS = json.dumps({
'ANSIBLE_MODULE_ARGS': args
})
temp_env = TemporaryEnvironment(env)
try:
try:
mod = __import__(module, {}, {}, [''])
module_fixups(mod)
# Ansible modules begin execution on import. Thus the above __import__
# will cause either Exit or ModuleError to be raised. If we reach the
# line below, the module did not execute and must already have been
# imported for a previous invocation, so we need to invoke main
# explicitly.
mod.main()
except (Exit, ModuleError), e:
result = json.dumps(e.dct)
finally:
temp_env.revert()
return result
def _async_main(job_id, module, raw_params, args, env):
"""
Implementation for the thread that implements asynchronous module
execution.
"""
try:
rc = run_module(module, raw_params, args, env)
except Exception, e:
rc = mitogen.core.CallError(e)
_result_by_job_id[job_id] = rc
def run_module_async(module, raw_params=None, args=None):
"""
Arrange for an Ansible module to be executed in a thread of the current
process, with results available via :py:func:`get_async_result`.
"""
job_id = '%08x' % random.randint(0, 2**32-1)
_result_by_job_id[job_id] = None
_thread_by_job_id[job_id] = threading.Thread(
target=_async_main,
kwargs={
'job_id': job_id,
'module': module,
'raw_params': raw_params,
'args': args,
}
)
_thread_by_job_id[job_id].start()
return json.dumps({
'ansible_job_id': job_id,
'changed': True
})
def get_async_result(job_id):
"""
Poll for the result of an asynchronous task.
:param str job_id:
Job ID to poll for.
:returns:
``None`` if job is still running, JSON-encoded result dictionary if
execution completed normally, or :py:class:`mitogen.core.CallError` if
an exception was thrown.
"""
if not _thread_by_job_id[job_id].isAlive():
return _result_by_job_id[job_id]
def get_user_shell():
"""
For commands executed directly via an SSH command-line, SSH looks up the
user's shell via getpwuid() and only defaults to /bin/sh if that field is
missing or empty.
"""
try:
pw_shell = pwd.getpwuid(os.geteuid()).pw_shell
except KeyError:
pw_shell = None
return pw_shell or '/bin/sh'
def exec_command(cmd, in_data='', chdir=None, shell=None):
"""
Run a command in a subprocess, emulating the argument handling behaviour of
SSH.
:param bytes cmd:
String command line, passed to user's shell.
:param bytes in_data:
Optional standard input for the command.
:return:
(return code, stdout bytes, stderr bytes)
"""
assert isinstance(cmd, basestring)
proc = subprocess.Popen(
args=[get_user_shell(), '-c', cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
cwd=chdir,
)
stdout, stderr = proc.communicate(in_data)
return proc.returncode, stdout, stderr
def read_path(path):
"""
Fetch the contents of a filesystem `path` as bytes.
"""
return open(path, 'rb').read()
def write_path(path, s):
"""
Writes bytes `s` to a filesystem `path`.
"""
open(path, 'wb').write(s)
CHMOD_CLAUSE_PAT = re.compile(r'([uoga]*)([+\-=])([ugo]|[rwx]*)')
CHMOD_MASKS = {
'u': stat.S_IRWXU,
'g': stat.S_IRWXG,
'o': stat.S_IRWXO,
'a': (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO),
}
CHMOD_BITS = {
'u': {'r': stat.S_IRUSR, 'w': stat.S_IWUSR, 'x': stat.S_IXUSR},
'g': {'r': stat.S_IRGRP, 'w': stat.S_IWGRP, 'x': stat.S_IXGRP},
'o': {'r': stat.S_IROTH, 'w': stat.S_IWOTH, 'x': stat.S_IXOTH},
'a': {
'r': (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH),
'w': (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH),
'x': (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
}
}
def apply_mode_spec(spec, mode):
"""
Given a symbolic file mode change specification in the style of chmod(1)
`spec`, apply changes in the specification to the numeric file mode `mode`.
"""
for clause in spec.split(','):
match = CHMOD_CLAUSE_PAT.match(clause)
who, op, perms = match.groups()
for ch in who or 'a':
mask = CHMOD_MASKS[ch]
bits = CHMOD_BITS[ch]
cur_perm_bits = mode & mask
new_perm_bits = reduce(operator.or_, (bits[p] for p in perms), 0)
mode &= ~mask
if op == '=':
mode |= new_perm_bits
elif op == '+':
mode |= new_perm_bits | cur_perm_bits
else:
mode |= cur_perm_bits & ~new_perm_bits
return mode
def set_file_mode(path, spec):
"""
Update the permissions of a file using the same syntax as chmod(1).
"""
mode = os.stat(path).st_mode
if spec.isdigit():
new_mode = int(spec, 8)
else:
new_mode = apply_mode_spec(spec, mode)
os.chmod(path, new_mode)

@ -69,13 +69,14 @@ def setup():
"""
display = find_display()
logging.getLogger('ansible_mitogen').handlers = [Handler(display.v)]
logging.getLogger('ansible_mitogen').setLevel(logging.DEBUG)
logging.getLogger('ansible_mitogen').handlers = [Handler(display.vvv)]
mitogen.core.LOG.handlers = [Handler(display.vvv)]
mitogen.core.IOLOG.handlers = [Handler(display.vvvv)]
mitogen.core.IOLOG.propagate = False
mitogen.core.LOG.handlers = [Handler(display.v)]
mitogen.core.LOG.setLevel(logging.DEBUG)
if display.verbosity > 2:
logging.getLogger('ansible_mitogen').setLevel(logging.DEBUG)
mitogen.core.LOG.setLevel(logging.DEBUG)
mitogen.core.IOLOG.handlers = [Handler(display.vvvv)]
if display.verbosity > 3:
mitogen.core.IOLOG.setLevel(logging.DEBUG)
mitogen.core.IOLOG.propagate = False

@ -39,6 +39,7 @@ from ansible.module_utils._text import to_bytes
from ansible.parsing.utils.jsonify import jsonify
import ansible
import ansible.constants
import ansible.plugins
import ansible.plugins.action
@ -52,29 +53,14 @@ import mitogen.master
from mitogen.utils import cast
import ansible_mitogen.connection
import ansible_mitogen.helpers
import ansible_mitogen.planner
import ansible_mitogen.target
from ansible.module_utils._text import to_text
LOG = logging.getLogger(__name__)
def get_command_module_name(module_name):
"""
Given the name of an Ansible command module, return its canonical module
path within the ansible.
:param module_name:
"shell"
:return:
"ansible.modules.commands.shell"
"""
path = module_loader.find_plugin(module_name, '')
relpath = os.path.relpath(path, os.path.dirname(ansible.__file__))
root, _ = os.path.splitext(relpath)
return 'ansible.' + root.replace('/', '.')
class ActionModuleMixin(ansible.plugins.action.ActionBase):
"""
The Mitogen-patched PluginLoader dynamically mixes this into every action
@ -131,7 +117,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
"""
Arrange for a Python function to be called in the target context, which
should be some function from the standard library or
ansible_mitogen.helpers module. This junction point exists mainly as a
ansible_mitogen.target module. This junction point exists mainly as a
nice place to insert print statements during debugging.
"""
return self._connection.call(func, *args, **kwargs)
@ -176,7 +162,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
target user account.
"""
LOG.debug('_remote_file_exists(%r)', path)
return self.call(os.path.exists, path)
return self.call(os.path.exists, mitogen.utils.cast(path))
def _configure_module(self, module_name, module_args, task_vars=None):
"""
@ -192,15 +178,34 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
"""
assert False, "_is_pipelining_enabled() should never be called."
def _get_remote_tmp(self):
"""
Mitogen-only: return the 'remote_tmp' setting.
"""
try:
s = self._connection._shell.get_option('remote_tmp')
except AttributeError:
s = ansible.constants.DEFAULT_REMOTE_TMP # <=2.4.x
return self._remote_expand_user(s)
def _make_tmp_path(self, remote_user=None):
"""
Replace the base implementation's use of shell to implement mkdtemp()
with an actual call to mkdtemp().
"""
LOG.debug('_make_tmp_path(remote_user=%r)', remote_user)
path = self.call(tempfile.mkdtemp, prefix='ansible-mitogen-tmp-')
# _make_tmp_path() is basically a global stashed away as Shell.tmpdir.
# The copy action plugin violates layering and grabs this attribute
# directly.
self._connection._shell.tmpdir = self.call(
ansible_mitogen.target.make_temp_directory,
base_dir=self._get_remote_tmp(),
)
LOG.debug('Temporary directory: %r', self._connection._shell.tmpdir)
self._cleanup_remote_tmp = True
return path
return self._connection._shell.tmpdir
def _remove_tmp_path(self, tmp_path):
"""
@ -208,8 +213,11 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
shutil.rmtree().
"""
LOG.debug('_remove_tmp_path(%r)', tmp_path)
if tmp_path is None:
tmp_path = self._connection._shell.tmpdir
if self._should_remove_tmp_path(tmp_path):
return self.call(shutil.rmtree, tmp_path)
self.call(shutil.rmtree, tmp_path)
self._connection._shell.tmpdir = None
def _transfer_data(self, remote_path, data):
"""
@ -247,7 +255,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
paths, mode, sudoable)
return self.fake_shell(lambda: mitogen.master.Select.all(
self._connection.call_async(
ansible_mitogen.helpers.set_file_mode, path, mode
ansible_mitogen.target.set_file_mode, path, mode
)
for path in paths
))
@ -272,48 +280,69 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
Replace the base implementation's attempt to emulate
os.path.expanduser() with an actual call to os.path.expanduser().
"""
LOG.debug('_remove_expand_user(%r, sudoable=%r)', path, sudoable)
LOG.debug('_remote_expand_user(%r, sudoable=%r)', path, sudoable)
if not path.startswith('~'):
# /home/foo -> /home/foo
return path
if path == '~':
# ~ -> /home/dmw
return self._connection.homedir
if path.startswith('~/'):
# ~/.ansible -> /home/dmw/.ansible
return os.path.join(self._connection.homedir, path[2:])
if path.startswith('~'):
path = self.call(os.path.expanduser, path)
return path
# ~root/.ansible -> /root/.ansible
return self.call(os.path.expanduser, path)
def _execute_module(self, module_name=None, module_args=None, tmp=None,
task_vars=None, persist_files=False,
delete_remote_tmp=True, wrap_async=False):
"""
Collect up a module's execution environment then use it to invoke
helpers.run_module() or helpers.run_module_async() in the target
target.run_module() or helpers.run_module_async() in the target
context.
"""
if task_vars is None:
task_vars = {}
if module_name is None:
module_name = self._task.action
if module_args is None:
module_args = self._task.args
if task_vars is None:
task_vars = {}
self._update_module_args(module_name, module_args, task_vars)
if wrap_async:
helper = ansible_mitogen.helpers.run_module_async
else:
helper = ansible_mitogen.helpers.run_module
env = {}
self._compute_environment_string(env)
js = self.call(
helper,
get_command_module_name(module_name),
args=cast(module_args),
env=cast(env),
return ansible_mitogen.planner.invoke(
ansible_mitogen.planner.Invocation(
action=self,
connection=self._connection,
module_name=mitogen.utils.cast(module_name),
module_args=mitogen.utils.cast(module_args),
remote_tmp=mitogen.utils.cast(self._get_remote_tmp()),
task_vars=task_vars,
templar=self._templar,
env=mitogen.utils.cast(env),
wrap_async=wrap_async,
)
)
data = self._parse_returned_data({
'rc': 0,
'stdout': js,
'stdout_lines': [js],
'stderr': ''
})
def _postprocess_response(self, result):
"""
Apply fixups mimicking ActionBase._execute_module(); this is copied
verbatim from action/__init__.py, the guts of _parse_returned_data are
garbage and should be removed or reimplemented once tests exist.
:param dict result:
Dictionary with format::
{
"rc": int,
"stdout": "stdout data",
"stderr": "stderr data"
}
"""
data = self._parse_returned_data(result)
# Cutpasted from the base implementation.
if 'stdout' in data and 'stdout_lines' not in data:
@ -328,8 +357,8 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
encoding_errors='surrogate_then_replace',
chdir=None):
"""
Replace the mad rat's nest of logic in the base implementation by
simply calling helpers.exec_command() in the target context.
Override the base implementation by simply calling
target.exec_command() in the target context.
"""
LOG.debug('_low_level_execute_command(%r, in_data=%r, exe=%r, dir=%r)',
cmd, type(in_data), executable, chdir)
@ -338,11 +367,11 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
if executable:
cmd = executable + ' -c ' + commands.mkarg(cmd)
rc, stdout, stderr = self.call(
ansible_mitogen.helpers.exec_command,
cast(cmd),
cast(in_data),
chdir=cast(chdir),
rc, stdout, stderr = self._connection.exec_command(
cmd=cmd,
in_data=in_data,
sudoable=sudoable,
mitogen_chdir=chdir,
)
stdout_text = to_text(stdout, errors=encoding_errors)

@ -0,0 +1,349 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Classes to detect each case from [0] and prepare arguments necessary for the
corresponding Runner class within the target, including preloading requisite
files/modules known missing.
[0] "Ansible Module Architecture", developing_program_flow_modules.html
"""
from __future__ import absolute_import
import logging
import os
from ansible.executor import module_common
import ansible.errors
try:
from ansible.plugins.loader import module_loader
except ImportError: # Ansible <2.4
from ansible.plugins import module_loader
import mitogen
import mitogen.service
import ansible_mitogen.target
import ansible_mitogen.services
LOG = logging.getLogger(__name__)
NO_METHOD_MSG = 'Mitogen: no invocation method found for: '
CRASHED_MSG = 'Mitogen: internal error: '
NO_INTERPRETER_MSG = 'module (%s) is missing interpreter line'
def parse_script_interpreter(source):
"""
Extract the script interpreter and its sole argument from the module
source code.
:returns:
Tuple of `(interpreter, arg)`, where `intepreter` is the script
interpreter and `arg` is its sole argument if present, otherwise
:py:data:`None`.
"""
# Linux requires first 2 bytes with no whitespace, pretty sure it's the
# same everywhere. See binfmt_script.c.
if not source.startswith('#!'):
return None, None
# Find terminating newline. Assume last byte of binprm_buf if absent.
nl = source.find('\n', 0, 128)
if nl == -1:
nl = min(128, len(source))
# Split once on the first run of whitespace. If no whitespace exists,
# bits just contains the interpreter filename.
bits = source[2:nl].strip().split(None, 1)
if len(bits) == 1:
return bits[0], None
return bits[0], bits[1]
class Invocation(object):
"""
Collect up a module's execution environment then use it to invoke
target.run_module() or helpers.run_module_async() in the target context.
"""
def __init__(self, action, connection, module_name, module_args,
remote_tmp, task_vars, templar, env, wrap_async):
#: ActionBase instance invoking the module. Required to access some
#: output postprocessing methods that don't belong in ActionBase at
#: all.
self.action = action
#: Ansible connection to use to contact the target. Must be an
#: ansible_mitogen connection.
self.connection = connection
#: Name of the module ('command', 'shell', etc.) to execute.
self.module_name = module_name
#: Final module arguments.
self.module_args = module_args
#: Value of 'remote_tmp' parameter, to allow target to create temporary
#: files in correct location.
self.remote_tmp = remote_tmp
#: Task variables, needed to extract ansible_*_interpreter.
self.task_vars = task_vars
#: Templar, needed to extract ansible_*_interpreter.
self.templar = templar
#: Final module environment.
self.env = env
#: Boolean, if :py:data:`True`, launch the module asynchronously.
self.wrap_async = wrap_async
#: Initially ``None``, but set by :func:`invoke`. The path on the
#: master to the module's implementation file.
self.module_path = None
#: Initially ``None``, but set by :func:`invoke`. The raw source or
#: binary contents of the module.
self.module_source = None
def __repr__(self):
return 'Invocation(module_name=%s)' % (self.module_name,)
class Planner(object):
"""
A Planner receives a module name and the contents of its implementation
file, indicates whether or not it understands how to run the module, and
exports a method to run the module.
"""
def detect(self, invocation):
"""
Return true if the supplied `invocation` matches the module type
implemented by this planner.
"""
raise NotImplementedError()
def plan(self, invocation):
"""
If :meth:`detect` returned :data:`True`, plan for the module's
execution, including granting access to or delivering any files to it
that are known to be absent, and finally return a dict::
{
# Name of the class from runners.py that implements the
# target-side execution of this module type.
"runner_name": "...",
# Remaining keys are passed to the constructor of the class
# named by `runner_name`.
}
"""
raise NotImplementedError()
class BinaryPlanner(Planner):
"""
Binary modules take their arguments and will return data to Ansible in the
same way as want JSON modules.
"""
runner_name = 'BinaryRunner'
def detect(self, invocation):
return module_common._is_binary(invocation.module_source)
def plan(self, invocation):
invocation.connection._connect()
mitogen.service.call(
invocation.connection.parent,
ansible_mitogen.services.FileService.handle,
('register', invocation.module_path)
)
return {
'runner_name': self.runner_name,
'module': invocation.module_name,
'service_context': invocation.connection.parent,
'path': invocation.module_path,
'args': invocation.module_args,
'env': invocation.env,
'remote_tmp': invocation.remote_tmp,
}
class ScriptPlanner(BinaryPlanner):
"""
Common functionality for script module planners -- handle interpreter
detection and rewrite.
"""
def _rewrite_interpreter(self, invocation, interpreter):
key = u'ansible_%s_interpreter' % os.path.basename(interpreter).strip()
try:
template = invocation.task_vars[key].strip()
return invocation.templar.template(template)
except KeyError:
return interpreter
def plan(self, invocation):
kwargs = super(ScriptPlanner, self).plan(invocation)
interpreter, arg = parse_script_interpreter(invocation.module_source)
if interpreter is None:
raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % (
invocation.module_name,
))
return dict(kwargs,
interpreter_arg=arg,
interpreter=self._rewrite_interpreter(
interpreter=interpreter,
invocation=invocation
)
)
class ReplacerPlanner(BinaryPlanner):
"""
The Module Replacer framework is the original framework implementing
new-style modules. It is essentially a preprocessor (like the C
Preprocessor for those familiar with that programming language). It does
straight substitutions of specific substring patterns in the module file.
There are two types of substitutions.
* Replacements that only happen in the module file. These are public
replacement strings that modules can utilize to get helpful boilerplate
or access to arguments.
"from ansible.module_utils.MOD_LIB_NAME import *" is replaced with the
contents of the ansible/module_utils/MOD_LIB_NAME.py. These should only
be used with new-style Python modules.
"#<<INCLUDE_ANSIBLE_MODULE_COMMON>>" is equivalent to
"from ansible.module_utils.basic import *" and should also only apply to
new-style Python modules.
"# POWERSHELL_COMMON" substitutes the contents of
"ansible/module_utils/powershell.ps1". It should only be used with
new-style Powershell modules.
"""
runner_name = 'ReplacerRunner'
def detect(self, invocation):
return module_common.REPLACER in invocation.module_source
class JsonArgsPlanner(ScriptPlanner):
"""
Script that has its interpreter directive and the task arguments
substituted into its source as a JSON string.
"""
runner_name = 'JsonArgsRunner'
def detect(self, invocation):
return module_common.REPLACER_JSONARGS in invocation.module_source
class WantJsonPlanner(ScriptPlanner):
"""
If a module has the string WANT_JSON in it anywhere, Ansible treats it as a
non-native module that accepts a filename as its only command line
parameter. The filename is for a temporary file containing a JSON string
containing the module's parameters. The module needs to open the file, read
and parse the parameters, operate on the data, and print its return data as
a JSON encoded dictionary to stdout before exiting.
These types of modules are self-contained entities. As of Ansible 2.1,
Ansible only modifies them to change a shebang line if present.
"""
runner_name = 'WantJsonRunner'
def detect(self, invocation):
return 'WANT_JSON' in invocation.module_source
class NewStylePlanner(ScriptPlanner):
"""
The Ansiballz framework differs from module replacer in that it uses real
Python imports of things in ansible/module_utils instead of merely
preprocessing the module.
"""
runner_name = 'NewStyleRunner'
def detect(self, invocation):
return 'from ansible.module_utils.' in invocation.module_source
class ReplacerPlanner(NewStylePlanner):
runner_name = 'ReplacerRunner'
def detect(self, invocation):
return module_common.REPLACER in invocation.module_source
class OldStylePlanner(ScriptPlanner):
runner_name = 'OldStyleRunner'
def detect(self, invocation):
# Everything else.
return True
_planners = [
BinaryPlanner,
# ReplacerPlanner,
NewStylePlanner,
JsonArgsPlanner,
WantJsonPlanner,
OldStylePlanner,
]
def get_module_data(name):
path = module_loader.find_plugin(name, '')
with open(path, 'rb') as fp:
source = fp.read()
return path, source
def invoke(invocation):
"""
Find a suitable Planner that knows how to run `invocation`.
"""
(invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name)
for klass in _planners:
planner = klass()
if planner.detect(invocation):
break
else:
raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))
kwargs = planner.plan(invocation)
if invocation.wrap_async:
helper = ansible_mitogen.target.run_module_async
else:
helper = ansible_mitogen.target.run_module
try:
js = invocation.connection.call(helper, kwargs)
except mitogen.core.CallError as e:
LOG.exception('invocation crashed: %r', invocation)
summary = str(e).splitlines()[0]
raise ansible.errors.AnsibleInternalError(CRASHED_MSG + summary)
return invocation.action._postprocess_response(js)

@ -28,7 +28,7 @@
import ansible.plugins.action
import mitogen.core
import ansible_mitogen.helpers
import ansible_mitogen.target
from mitogen.utils import cast
@ -37,7 +37,7 @@ class ActionModule(ansible.plugins.action.ActionBase):
job_id = self._task.args['jid']
try:
result = self._connection.call(
ansible_mitogen.helpers.get_async_result,
ansible_mitogen.target.get_async_result,
cast(job_id),
)
except mitogen.core.CallError, e:

@ -51,6 +51,10 @@ except ImportError:
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir
from ansible_mitogen.strategy import StrategyModule
del os
del sys
import ansible_mitogen.strategy
import ansible.plugins.strategy.linear
class StrategyModule(ansible_mitogen.strategy.StrategyMixin,
ansible.plugins.strategy.linear.StrategyModule):
pass

@ -0,0 +1,60 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os.path
import sys
#
# This is not the real Strategy implementation module, it simply exists as a
# proxy to the real module, which is loaded using Python's regular import
# mechanism, to prevent Ansible's PluginLoader from making up a fake name that
# results in ansible_mitogen plugin modules being loaded twice: once by
# PluginLoader with a name like "ansible.plugins.strategy.mitogen", which is
# stuffed into sys.modules even though attempting to import it will trigger an
# ImportError, and once under its canonical name, "ansible_mitogen.strategy".
#
# Therefore we have a proxy module that imports it under the real name, and
# sets up the duff PluginLoader-imported module to just contain objects from
# the real module, so duplicate types don't exist in memory, and things like
# debuggers and isinstance() work predictably.
#
try:
import ansible_mitogen
except ImportError:
base_dir = os.path.dirname(__file__)
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir
import ansible_mitogen.strategy
import ansible.plugins.strategy.free
class StrategyModule(ansible_mitogen.strategy.StrategyMixin,
ansible.plugins.strategy.free.StrategyModule):
pass

@ -0,0 +1,60 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os.path
import sys
#
# This is not the real Strategy implementation module, it simply exists as a
# proxy to the real module, which is loaded using Python's regular import
# mechanism, to prevent Ansible's PluginLoader from making up a fake name that
# results in ansible_mitogen plugin modules being loaded twice: once by
# PluginLoader with a name like "ansible.plugins.strategy.mitogen", which is
# stuffed into sys.modules even though attempting to import it will trigger an
# ImportError, and once under its canonical name, "ansible_mitogen.strategy".
#
# Therefore we have a proxy module that imports it under the real name, and
# sets up the duff PluginLoader-imported module to just contain objects from
# the real module, so duplicate types don't exist in memory, and things like
# debuggers and isinstance() work predictably.
#
try:
import ansible_mitogen
except ImportError:
base_dir = os.path.dirname(__file__)
sys.path.insert(0, os.path.abspath(os.path.join(base_dir, '../../..')))
del base_dir
import ansible_mitogen.strategy
import ansible.plugins.strategy.linear
class StrategyModule(ansible_mitogen.strategy.StrategyMixin,
ansible.plugins.strategy.linear.StrategyModule):
pass

@ -134,7 +134,7 @@ class MuxProcess(object):
"""
Construct a Router, Broker, and mitogen.unix listener
"""
self.router = mitogen.master.Router()
self.router = mitogen.master.Router(max_message_size=4096*1048576)
self.router.responder.whitelist_prefix('ansible')
self.router.responder.whitelist_prefix('ansible_mitogen')
mitogen.core.listen(self.router.broker, 'shutdown', self.on_broker_shutdown)
@ -153,7 +153,8 @@ class MuxProcess(object):
self.pool = mitogen.service.Pool(
router=self.router,
services=[
ansible_mitogen.services.ContextService(self.router)
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.FileService(self.router),
],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),
)

@ -0,0 +1,418 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
These classes implement execution for each style of Ansible module. They are
instantiated in the target context by way of target.py::run_module().
Each class in here has a corresponding Planner class in planners.py that knows
how to build arguments for it, preseed related data, etc.
"""
from __future__ import absolute_import
import cStringIO
import json
import logging
import os
import shutil
import sys
import tempfile
import types
import ansible_mitogen.target # TODO: circular import
try:
from shlex import quote as shlex_quote
except ImportError:
from pipes import quote as shlex_quote
# Prevent accidental import of an Ansible module from hanging on stdin read.
import ansible.module_utils.basic
ansible.module_utils.basic._ANSIBLE_ARGS = '{}'
LOG = logging.getLogger(__name__)
def reopen_readonly(fp):
"""
Replace the file descriptor belonging to the file object `fp` with one
open on the same file (`fp.name`), but opened with :py:data:`os.O_RDONLY`.
This enables temporary files to be executed on Linux, which usually theows
``ETXTBUSY`` if any writeable handle exists pointing to a file passed to
`execve()`.
"""
fd = os.open(fp.name, os.O_RDONLY)
os.dup2(fd, fp.fileno())
os.close(fd)
class Runner(object):
"""
Ansible module runner. After instantiation (with kwargs supplied by the
corresponding Planner), `.run()` is invoked, upon which `setup()`,
`_run()`, and `revert()` are invoked, with the return value of `_run()`
returned by `run()`.
Subclasses may override `_run`()` and extend `setup()` and `revert()`.
"""
def __init__(self, module, remote_tmp, raw_params=None, args=None, env=None):
if args is None:
args = {}
if raw_params is not None:
args['_raw_params'] = raw_params
self.module = module
self.remote_tmp = os.path.expanduser(remote_tmp)
self.raw_params = raw_params
self.args = args
self.env = env
self._temp_dir = None
def get_temp_dir(self):
if not self._temp_dir:
self._temp_dir = ansible_mitogen.target.make_temp_directory(
self.remote_tmp,
)
return self._temp_dir
def setup(self):
"""
Prepare the current process for running a module. The base
implementation simply prepares the environment.
"""
self._env = TemporaryEnvironment(self.env)
def revert(self):
"""
Revert any changes made to the process after running a module. The base
implementation simply restores the original environment.
"""
self._env.revert()
if self._temp_dir:
shutil.rmtree(self._temp_dir)
def _run(self):
"""
The _run() method is expected to return a dictionary in the form of
ActionBase._low_level_execute_command() output, i.e. having::
{
"rc": int,
"stdout": "stdout data",
"stderr": "stderr data"
}
"""
raise NotImplementedError()
def run(self):
"""
Set up the process environment in preparation for running an Ansible
module. This monkey-patches the Ansible libraries in various places to
prevent it from trying to kill the process on completion, and to
prevent it from reading sys.stdin.
:returns:
Module result dictionary.
"""
self.setup()
try:
return self._run()
finally:
self.revert()
class TemporaryEnvironment(object):
def __init__(self, env=None):
self.original = os.environ.copy()
self.env = env or {}
os.environ.update((k, str(v)) for k, v in self.env.iteritems())
def revert(self):
os.environ.clear()
os.environ.update(self.original)
class TemporaryArgv(object):
def __init__(self, argv):
self.original = sys.argv[:]
sys.argv[:] = argv
def revert(self):
sys.argv[:] = self.original
class NewStyleStdio(object):
"""
Patch ansible.module_utils.basic argument globals.
"""
def __init__(self, args):
self.original_stdout = sys.stdout
self.original_stderr = sys.stderr
self.original_stdin = sys.stdin
sys.stdout = cStringIO.StringIO()
sys.stderr = cStringIO.StringIO()
ansible.module_utils.basic._ANSIBLE_ARGS = json.dumps({
'ANSIBLE_MODULE_ARGS': args
})
sys.stdin = cStringIO.StringIO(
ansible.module_utils.basic._ANSIBLE_ARGS
)
def revert(self):
sys.stdout = self.original_stdout
sys.stderr = self.original_stderr
sys.stdin = self.original_stdin
ansible.module_utils.basic._ANSIBLE_ARGS = '{}'
class ProgramRunner(Runner):
def __init__(self, path, service_context, **kwargs):
super(ProgramRunner, self).__init__(**kwargs)
self.path = path
self.service_context = service_context
def setup(self):
super(ProgramRunner, self).setup()
self._setup_program()
def _setup_program(self):
"""
Create a temporary file containing the program code. The code is
fetched via :meth:`_get_program`.
"""
self.program_fp = open(
os.path.join(self.get_temp_dir(), self.module),
'wb'
)
self.program_fp.write(self._get_program())
self.program_fp.flush()
os.chmod(self.program_fp.name, int('0700', 8))
reopen_readonly(self.program_fp)
def _get_program(self):
"""
Fetch the module binary from the master if necessary.
"""
return ansible_mitogen.target.get_file(
context=self.service_context,
path=self.path,
)
def _get_program_args(self):
return [
self.args['_ansible_shell_executable'],
'-c',
self.program_fp.name
]
def revert(self):
"""
Delete the temporary program file.
"""
self.program_fp.close()
super(ProgramRunner, self).revert()
def _run(self):
try:
rc, stdout, stderr = ansible_mitogen.target.exec_args(
args=self._get_program_args(),
emulate_tty=True,
)
except Exception, e:
LOG.exception('While running %s', self._get_program_args())
return {
'rc': 1,
'stdout': '',
'stderr': '%s: %s' % (type(e), e),
}
return {
'rc': rc,
'stdout': stdout,
'stderr': stderr
}
class ArgsFileRunner(Runner):
def setup(self):
super(ArgsFileRunner, self).setup()
self._setup_args()
def _setup_args(self):
"""
Create a temporary file containing the module's arguments. The
arguments are formatted via :meth:`_get_args`.
"""
self.args_fp = tempfile.NamedTemporaryFile(
prefix='ansible_mitogen',
suffix='-args',
dir=self.get_temp_dir(),
)
self.args_fp.write(self._get_args_contents())
self.args_fp.flush()
reopen_readonly(self.program_fp)
def _get_args_contents(self):
"""
Return the module arguments formatted as JSON.
"""
return json.dumps(self.args)
def _get_program_args(self):
return [
self.args['_ansible_shell_executable'],
'-c',
"%s %s" % (self.program_fp.name, self.args_fp.name),
]
def revert(self):
"""
Delete the temporary argument file.
"""
self.args_fp.close()
super(ArgsFileRunner, self).revert()
class BinaryRunner(ArgsFileRunner, ProgramRunner):
pass
class ScriptRunner(ProgramRunner):
def __init__(self, interpreter, interpreter_arg, **kwargs):
super(ScriptRunner, self).__init__(**kwargs)
self.interpreter = interpreter
self.interpreter_arg = interpreter_arg
b_ENCODING_STRING = b'# -*- coding: utf-8 -*-'
def _get_program(self):
return self._rewrite_source(
super(ScriptRunner, self)._get_program()
)
def _rewrite_source(self, s):
"""
Mutate the source according to the per-task parameters.
"""
# Couldn't find shebang, so let shell run it, because shell assumes
# executables like this are just shell scripts.
if not self.interpreter:
return s
shebang = '#!' + self.interpreter
if self.interpreter_arg:
shebang += ' ' + self.interpreter_arg
new = [shebang]
if os.path.basename(self.interpreter).startswith('python'):
new.append(self.b_ENCODING_STRING)
_, _, rest = s.partition('\n')
new.append(rest)
return '\n'.join(new)
class NewStyleRunner(ScriptRunner):
"""
Execute a new-style Ansible module, where Module Replacer-related tricks
aren't required.
"""
#: path => new-style module bytecode.
_code_by_path = {}
def setup(self):
super(NewStyleRunner, self).setup()
self._stdio = NewStyleStdio(self.args)
self._argv = TemporaryArgv([self.path])
def revert(self):
self._argv.revert()
self._stdio.revert()
super(NewStyleRunner, self).revert()
def _get_code(self):
try:
return self._code_by_path[self.path]
except KeyError:
return self._code_by_path.setdefault(self.path, compile(
source=ansible_mitogen.target.get_file(
context=self.service_context,
path=self.path,
),
filename=self.path,
mode='exec',
dont_inherit=True,
))
def _run(self):
code = self._get_code()
mod = types.ModuleType('__main__')
d = vars(mod)
e = None
try:
exec code in d, d
except SystemExit, e:
pass
return {
'rc': e[0] if e else 2,
'stdout': sys.stdout.getvalue(),
'stderr': sys.stderr.getvalue(),
}
class JsonArgsRunner(ScriptRunner):
JSON_ARGS = '<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>'
def _get_args_contents(self):
return json.dumps(self.args)
def _rewrite_source(self, s):
return (
super(JsonArgsRunner, self)._rewrite_source(s)
.replace(self.JSON_ARGS, self._get_args_contents())
)
class WantJsonRunner(ArgsFileRunner, ScriptRunner):
pass
class OldStyleRunner(ArgsFileRunner, ScriptRunner):
def _get_args_contents(self):
"""
Mimic the argument formatting behaviour of
ActionBase._execute_module().
"""
return ' '.join(
'%s=%s' % (key, shlex_quote(str(self.args[key])))
for key in self.args
) + ' ' # Bug-for-bug :(

@ -27,9 +27,17 @@
# POSSIBILITY OF SUCH DAMAGE.
from __future__ import absolute_import
import logging
import os.path
import zlib
import mitogen
import mitogen.service
LOG = logging.getLogger(__name__)
class ContextService(mitogen.service.DeduplicatingService):
"""
Used by worker processes connecting back into the top-level process to
@ -40,7 +48,7 @@ class ContextService(mitogen.service.DeduplicatingService):
https://mitogen.readthedocs.io/en/latest/api.html#context-factories
This concentrates all SSH connections in the top-level process, which may
become a bottleneck. There are multiple ways to fix that:
become a bottleneck. There are multiple ways to fix that:
* creating one .local() child context per CPU and sharding connections
between them, using the master process to route messages, or
* as above, but having each child create a unique UNIX listener and
@ -55,7 +63,13 @@ class ContextService(mitogen.service.DeduplicatingService):
existing connection, but popped from the list of arguments passed to
the connection method.
:returns mitogen.master.Context:
:returns tuple:
Tuple of `(context, home_dir)`, where:
* `context` is the mitogen.master.Context referring to the target
context.
* `home_dir` is a cached copy of the remote directory.
mitogen.master.Context:
Corresponding Context instance.
"""
handle = 500
@ -67,4 +81,86 @@ class ContextService(mitogen.service.DeduplicatingService):
def get_response(self, args):
args.pop('discriminator', None)
method = getattr(self.router, args.pop('method'))
return method(**args)
try:
context = method(**args)
except mitogen.core.StreamError as e:
return {
'context': None,
'home_dir': None,
'msg': str(e),
}
home_dir = context.call(os.path.expanduser, '~')
return {
'context': context,
'home_dir': home_dir,
'msg': None,
}
class FileService(mitogen.service.Service):
"""
Primitive latency-inducing file server for old-style incantations of the
module runner. This is to be replaced later with a scheme that forwards
files known to be missing without the target having to ask for them,
avoiding a corresponding roundtrip per file.
Paths must be explicitly added to the service by a trusted context before
they will be served to an untrusted context.
:param tuple args:
Tuple of `(cmd, path)`, where:
- cmd: one of "register", "fetch", where:
- register: register a file that may be fetched
- fetch: fetch a file that was previously registered
- path: key of the file to fetch or register
:returns:
Returns ``None` for "register", or the file data for "fetch".
:raises mitogen.core.CallError:
Security violation occurred, either path not registered, or attempt to
register path from unprivileged context.
"""
handle = 501
max_message_size = 1000
policies = (
mitogen.service.AllowAny(),
)
unprivileged_msg = 'Cannot register from unprivileged context.'
unregistered_msg = 'Path is not registered with FileService.'
def __init__(self, router):
super(FileService, self).__init__(router)
self._paths = {}
def validate_args(self, args):
return (
isinstance(args, tuple) and
len(args) == 2 and
args[0] in ('register', 'fetch') and
isinstance(args[1], basestring)
)
def dispatch(self, args, msg):
cmd, path = args
return getattr(self, cmd)(path, msg)
def register(self, path, msg):
if not mitogen.core.has_parent_authority(msg):
raise mitogen.core.CallError(self.unprivileged_msg)
if path in self._paths:
return
LOG.info('%r: registering %r', self, path)
with open(path, 'rb') as fp:
self._paths[path] = zlib.compress(fp.read())
def fetch(self, path, msg):
if path not in self._paths:
raise mitogen.core.CallError(self.unregistered_msg)
LOG.debug('Serving %r to context %r', path, msg.src_id)
return self._paths[path]

@ -30,7 +30,6 @@ from __future__ import absolute_import
import os
import ansible.errors
import ansible.plugins.strategy.linear
import ansible_mitogen.mixins
import ansible_mitogen.process
@ -83,12 +82,12 @@ def wrap_connection_loader__get(name, play_context, new_stdin, **kwargs):
return connection_loader__get(name, play_context, new_stdin, **kwargs)
class StrategyModule(ansible.plugins.strategy.linear.StrategyModule):
class StrategyMixin(object):
"""
This strategy enhances the default "linear" strategy by arranging for
various Mitogen services to be initialized in the Ansible top-level
process, and for worker processes to grow support for using those top-level
services to communicate with and execute modules on remote hosts.
This mix-in enhances any built-in strategy by arranging for various Mitogen
services to be initialized in the Ansible top-level process, and for worker
processes to grow support for using those top-level services to communicate
with and execute modules on remote hosts.
Mitogen:
@ -132,12 +131,12 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule):
For action plug-ins, the original class is looked up as usual, but a
new subclass is created dynamically in order to mix-in
ansible_mitogen.helpers.ActionModuleMixin, which overrides many of the
ansible_mitogen.target.ActionModuleMixin, which overrides many of the
methods usually inherited from ActionBase in order to replace them with
pure-Python equivalents that avoid the use of shell.
In particular, _execute_module() is overridden with an implementation
that uses ansible_mitogen.helpers.run_module() executed in the target
that uses ansible_mitogen.target.run_module() executed in the target
Context. run_module() implements module execution by importing the
module as if it were a normal Python module, and capturing its output
in the remote process. Since the Mitogen module loader is active in the
@ -182,6 +181,6 @@ class StrategyModule(ansible.plugins.strategy.linear.StrategyModule):
self._add_connection_plugin_path()
self._install_wrappers()
try:
return super(StrategyModule, self).run(iterator, play_context)
return super(StrategyMixin, self).run(iterator, play_context)
finally:
self._remove_wrappers()

@ -1,5 +1,6 @@
-r docs/docs-requirements.txt
ansible==2.3.1.0
coverage==4.5.1
Django==1.6.11; python_version < '2.7'
Django==1.11.5; python_version >= '2.7' # for module_finder_test
https://github.com/docker/docker-py/archive/1.10.6.tar.gz; python_version < '2.7'
@ -9,6 +10,7 @@ pytest-catchlog==1.2.2
pytest==3.1.2
PyYAML==3.11; python_version < '2.7'
PyYAML==3.12; python_version >= '2.7'
timeoutcontext==1.2.0
unittest2==1.1.0
# Fix InsecurePlatformWarning while creating py26 tox environment
# https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings

1
docs/.gitignore vendored

@ -0,0 +1 @@
build

@ -8,7 +8,7 @@ default:
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
BUILDDIR = build
# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)

@ -57,6 +57,19 @@ quickly as possible.
relating to those files in cross-account scenarios are entirely avoided.
Demo
----
This demonstrates Ansible running a subset of the Mitogen integration tests
concurrent to an equivalent run using the extension.
.. raw:: html
<video width="720" height="439" controls>
<source src="http://k3.botanicus.net/tmp/ansible_mitogen.mp4" type="video/mp4">
</video>
Testimonials
------------
@ -97,10 +110,12 @@ Installation
[defaults]
strategy_plugins = /path/to/mitogen-master/ansible_mitogen/plugins/strategy
strategy = mitogen
strategy = mitogen_linear
The ``strategy`` key is optional. If omitted, you can set the
``ANSIBLE_STRATEGY=mitogen`` environment variable on a per-run basis.
``ANSIBLE_STRATEGY=mitogen_linear`` environment variable on a per-run basis.
Like ``mitogen_linear``, the ``mitogen_free`` strategy also exists to mimic
the built-in ``free`` strategy.
4. Cross your fingers and try it.
@ -113,9 +128,6 @@ This is a proof of concept: issues below are exclusively due to code immaturity.
High Risk
~~~~~~~~~
* For now only **built-in Python command modules work**, however almost all
modules shipped with Ansible are Python-based.
* Transfer of large (i.e. GB-sized) files using certain Ansible-internal APIs,
such as triggered via the ``copy`` module, will cause corresponding temporary
memory and CPU spikes on both host and target machine, due to delivering the
@ -129,11 +141,16 @@ High Risk
* `Asynchronous Actions And Polling
<https://docs.ansible.com/ansible/latest/playbooks_async.html>`_ has received
minimal testing.
minimal testing. Jobs execute in a thread of the target Python interpreter.
This will fixed shortly.
* No mechanism exists yet to bound the number of interpreters created during a
run. For some playbooks that parameterize ``become_user`` over a large number
of user accounts, resource exhaustion may be triggered on the target machine.
* Only Ansible 2.4 is being used for development, with occasional tests under
2.3 and 2.2. It should be more than possible to fully support at least 2.3,
if not also 2.2.
2.5, 2.3 and 2.2. It should be more than possible to fully support at least
2.3, if not also 2.2.
Low Risk
@ -144,31 +161,13 @@ Low Risk
* Only the ``sudo`` become method is available, however adding new methods is
straightforward, and eventually at least ``su`` will be included.
* The only supported strategy is ``linear``, which is Ansible's default.
* In some cases ``remote_tmp`` may not be respected.
* The extension's performance benefits do not scale perfectly linearly with the
number of targets. This is a subject of ongoing investigation and
improvements will appear in time.
* Ansible defaults to requiring pseudo TTYs for most SSH invocations, in order
to allow it to handle ``sudo`` with ``requiretty`` enabled, however it
disables pseudo TTYs for certain commands where standard input is required or
``sudo`` is not in use. Mitogen does not require this, as it can simply call
:py:func:`pty.openpty` from the SSH user account during ``sudo`` setup.
A major downside to Ansible's default is that stdout and stderr of any
resulting executed command are merged, with additional carriage return
characters synthesized in the output by the TTY layer. Neither of these
problems are apparent using the Mitogen extension, which may break some
playbooks.
A future version will emulate Ansible's behaviour, once it is clear precisely
what that behaviour is supposed to be. See `Ansible#14377`_ for related
discussion.
.. _Ansible#14377: https://github.com/ansible/ansible/issues/14377
* "Module Replacer" style modules are not yet supported. These rarely appear in
practice, and light Github code searches failed to reveal many examples of
them.
Behavioural Differences
@ -195,13 +194,6 @@ Behavioural Differences
captured and returned to the host machine, where it can be viewed as desired
with ``-vvv``.
* Ansible with SSH multiplexing enabled causes a string like ``Shared
connection to host closed`` to appear in ``stderr`` output of every executed
command. This never manifests with the Mitogen extension.
* Asynchronous support is very primitive, and jobs execute in a thread of the
target Python interpreter. This will fixed shortly.
* Local commands are executed in a reuseable Python interpreter created
identically to interpreters used on remote hosts. At present only one such
interpreter per ``become_user`` exists, and so only one action may be
@ -211,8 +203,57 @@ Behavioural Differences
release.
Demo
----
How Modules Execute
-------------------
Ansible usually modifies, recompresses and reuploads modules every time they
run on a target, work that must be repeated by the controller for every
playbook step.
With the extension any modifications are done on the target, allowing pristine
copies of modules to be cached, reducing the necessity to re-transfer modules
for each invocation. Unmodified modules are uploaded once on first use and
cached in RAM for the remainder of the run.
**Binary**
* Native executables detected using a complex heuristic.
* Arguments are supplied as a JSON file whose path is the sole script
parameter.
**Module Replacer**
* Python scripts detected by the presence of
``#<<INCLUDE_ANSIBLE_MODULE_COMMON>>`` appearing in their source.
* Not yet supported.
**New-Style**
* Python scripts detected by the presence of ``from ansible.module_utils.``
appearing in their source.
* Arguments are supplied as JSON written to ``sys.stdin`` of the target
interpreter.
**JSON_ARGS**
* Detected by the presence of ``INCLUDE_ANSIBLE_MODULE_JSON_ARGS``
appearing in the script source.
* The interpreter directive (``#!interpreter``) is adjusted to match the
corresponding value of ``{{ansible_*_interpreter}}`` if one is set.
* Arguments are supplied as JSON mixed into the script as a replacement for
``INCLUDE_ANSIBLE_MODULE_JSON_ARGS``.
**WANT_JSON**
* Detected by the presence of ``WANT_JSON`` appearing in the script source.
* The interpreter directive is adjusted as above.
* Arguments are supplied as a JSON file whose path is the sole script
parameter.
**Old Style**
* Files not matching any of the above tests.
* The interpreter directive is adjusted as above.
* Arguments are supplied as a file whose path is the sole script parameter.
The format of the file is ``"key=repr(value)[ key2=repr(value2)[ ..]] "``.
Sample Profiles
---------------
Local VM connection
~~~~~~~~~~~~~~~~~~~
@ -343,6 +384,30 @@ plug-ins are unlikely to attempt similar patches, so the risk to an established
configuration should be minimal.
Standard IO
~~~~~~~~~~~
Ansible uses pseudo TTYs for most invocations, to allow it to handle typing
passwords interactively, however it disables pseudo TTYs for certain commands
where standard input is required or ``sudo`` is not in use. Additionally when
SSH multiplexing is enabled, a string like ``Shared connection to localhost
closed\r\n`` appears in ``stderr`` of every invocation.
Mitogen does not naturally require either of these, as command output is
embedded within the SSH stream, and it can simply call :py:func:`pty.openpty`
in every location an interactive password must be typed.
A major downside to Ansible's behaviour is that ``stdout`` and ``stderr`` are
merged together into a single ``stdout`` variable, with carriage returns
inserted in the output by the TTY layer. However ugly, the extension emulates
all of this behaviour precisely, to avoid breaking playbooks that expect
certain text to appear in certain variables with certain linefeed characters.
See `Ansible#14377`_ for related discussion.
.. _Ansible#14377: https://github.com/ansible/ansible/issues/14377
Flag Emulation
~~~~~~~~~~~~~~

@ -412,7 +412,7 @@ Router Class
receive side to the I/O multiplexer. This This method remains public
for now while hte design has not yet settled.
.. method:: add_handler (fn, handle=None, persist=True, respondent=None)
.. method:: add_handler (fn, handle=None, persist=True, respondent=None, policy=None)
Invoke `fn(msg)` for each Message sent to `handle` from this context.
Unregister after one invocation if `persist` is ``False``. If `handle`
@ -435,6 +435,28 @@ Router Class
In future `respondent` will likely also be used to prevent other
contexts from sending messages to the handle.
:param function policy:
Function invoked as `policy(msg, stream)` where `msg` is a
:py:class:`mitogen.core.Message` about to be delivered, and
`stream` is the :py:class:`mitogen.core.Stream` on which it was
received. The function must return :py:data:`True`, otherwise an
error is logged and delivery is refused.
Two built-in policy functions exist:
* :py:func:`mitogen.core.has_parent_authority`: requires the
message arrived from a parent context, or a context acting with a
parent context's authority (``auth_id``).
* :py:func:`mitogen.parent.is_immediate_child`: requires the
message arrived from an immediately connected child, for use in
messaging patterns where either something becomes buggy or
insecure by permitting indirect upstream communication.
In case of refusal, and the message's ``reply_to`` field is
nonzero, a :py:class:`mitogen.core.CallError` is delivered to the
sender indicating refusal occurred.
:return:
`handle`, or if `handle` was ``None``, the newly allocated handle.
@ -932,6 +954,23 @@ Receiver Class
Used by :py:class:`mitogen.master.Select` to implement waiting on
multiple receivers.
.. py:method:: to_sender ()
Return a :py:class:`mitogen.core.Sender` configured to deliver messages
to this receiver. Since a Sender can be serialized, this makes it
convenient to pass `(context_id, handle)` pairs around::
def deliver_monthly_report(sender):
for line in open('monthly_report.txt'):
sender.send(line)
sender.close()
remote = router.ssh(hostname='mainframe')
recv = mitogen.core.Receiver(router)
remote.call(deliver_monthly_report, recv.to_sender())
for msg in recv:
print(msg)
.. py:method:: empty ()
Return ``True`` if calling :py:meth:`get` would block.
@ -997,6 +1036,9 @@ Sender Class
Senders are used to send pickled messages to a handle in another context,
it is the inverse of :py:class:`mitogen.core.Sender`.
Senders may be serialized, making them convenient to wire up data flows.
See :py:meth:`mitogen.core.Receiver.to_sender` for more information.
:param mitogen.core.Context context:
Context to send messages to.
:param int dst_handle:
@ -1007,7 +1049,7 @@ Sender Class
Send :py:data:`_DEAD` to the remote end, causing
:py:meth:`ChannelError` to be raised in any waiting thread.
.. py:method:: put (data)
.. py:method:: send (data)
Send `data` to the remote end.
@ -1062,11 +1104,11 @@ Broker Class
Mark the :py:attr:`receive_side <Stream.receive_side>` on `stream` as
not ready for reading. Safe to call from any thread.
.. method:: start_transmit (stream)
.. method:: _start_transmit (stream)
Mark the :py:attr:`transmit_side <Stream.transmit_side>` on `stream` as
ready for writing. Safe to call from any thread. When the associated
file descriptor becomes ready for writing,
ready for writing. Must only be called from the Broker thread. When the
associated file descriptor becomes ready for writing,
:py:meth:`BasicStream.on_transmit` will be called.
.. method:: stop_receive (stream)

@ -247,6 +247,24 @@ without the need for writing asynchronous code::
print 'Reply from %s: %s' % (recv.context, data)
Running Code That May Hang
--------------------------
When executing code that may hang due to, for example, talking to network peers
that may become unavailable, it is desirable to be able to recover control in
the case a remote call has hung.
By specifying the `timeout` parameter to :meth:`Receiver.get` on the receiver
returned by `Context.call_async`, it becomes possible to wait for a function to
complete, but time out if its result does not become available.
When a context has become hung like this, it is still possible to gracefully
terminate it using the :meth:`Context.shutdown` method. This method sends a
shutdown message to the target process, where its IO multiplexer thread can
still process it independently of the hung function running on on the target's
main thread.
Recovering Mitogen Object References In Children
------------------------------------------------
@ -262,7 +280,6 @@ Recovering Mitogen Object References In Children
...
Recursion
---------
@ -292,6 +309,7 @@ User-defined types may not be used, except for:
* :py:class:`mitogen.core.CallError`
* :py:class:`mitogen.core.Context`
* :py:class:`mitogen.core.Sender`
* :py:class:`mitogen.core._DEAD`
Subclasses of built-in types must be undecorated using

@ -122,8 +122,8 @@ send(msg)</y:MethodLabel>
</y:NodeLabel>
<y:UML clipContent="true" constraint="" omitDetails="false" stereotype="" use3DEffect="true">
<y:AttributeLabel/>
<y:MethodLabel>start_transmit(strm)
stop_transmit(strm)</y:MethodLabel>
<y:MethodLabel>_start_transmit(strm)
_stop_transmit(strm)</y:MethodLabel>
</y:UML>
</y:UMLClassNode>
</data>

@ -188,12 +188,12 @@ Stream Classes
.. method:: on_transmit (broker)
Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side`
has been marked writeable using :py:meth:`Broker.start_transmit` and
has been marked writeable using :py:meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for
writing.
Subclasses must implement this method if
:py:meth:`Broker.start_transmit` is ever called on them.
:py:meth:`Broker._start_transmit` is ever called on them.
.. method:: on_shutdown (broker)

@ -1,3 +1,15 @@
"""
mitop.py is a version of the UNIX top command that knows how to display process
lists from multiple machines in a single listing.
This is a basic, initial version showing overall program layout. A future
version will extend it to:
* Only notify the master of changed processes, rather than all processes.
* Runtime-reconfigurable filters and aggregations handled on the remote
machines rather than forcing a bottleneck in the master.
"""
import curses
import subprocess
@ -10,15 +22,28 @@ import mitogen.utils
class Host(object):
"""
A target host from the perspective of the master process.
"""
#: String hostname.
name = None
#: mitogen.parent.Context used to call functions on the host.
context = None
#: mitogen.core.Receiver the target delivers state updates to.
recv = None
def __init__(self):
self.procs = {} #: pid -> Process()
#: Mapping of pid -> Process() for each process described
#: in the host's previous status update.
self.procs = {}
class Process(object):
"""
A single process running on a target host.
"""
host = None
user = None
pid = None
@ -30,14 +55,20 @@ class Process(object):
rss = None
@mitogen.core.takes_router
def remote_main(context_id, handle, delay, router):
context = mitogen.core.Context(router, context_id)
sender = mitogen.core.Sender(context, handle)
def child_main(sender, delay):
"""
Executed on the main thread of the Python interpreter running on each
target machine, Context.call() from the master. It simply sends the output
of the UNIX 'ps' command at regular intervals toward a Receiver on master.
:param mitogen.core.Sender sender:
The Sender to use for delivering our result. This could target
anywhere, but the sender supplied by the master simply causes results
to be delivered to the master's associated per-host Receiver.
"""
args = ['ps', '-axwwo', 'user,pid,ppid,pgid,%cpu,rss,command']
while True:
sender.put(subprocess.check_output(args))
sender.send(subprocess.check_output(args))
time.sleep(delay)
@ -71,6 +102,9 @@ def parse_output(host, s):
class Painter(object):
"""
This is ncurses (screen drawing) magic, you can ignore it. :)
"""
def __init__(self, hosts):
self.stdscr = curses.initscr()
curses.start_color()
@ -124,7 +158,12 @@ class Painter(object):
self.stdscr.refresh()
def local_main(painter, router, select, delay):
def master_main(painter, router, select, delay):
"""
Loop until CTRL+C is pressed, waiting for the next result delivered by the
Select. Use parse_output() to turn that result ('ps' command output) into
rich data, and finally repaint the screen if the repaint delay has passed.
"""
next_paint = 0
while True:
msg = select.get()
@ -134,8 +173,13 @@ def local_main(painter, router, select, delay):
painter.paint()
def main(router, argv):
mitogen.utils.log_to_file()
@mitogen.main()
def main(router):
"""
Main program entry point. @mitogen.main() is just a helper to handle
reliable setup/destruction of Broker, Router and the logging package.
"""
argv = sys.argv[1:]
if not len(argv):
print 'mitop: Need a list of SSH hosts to connect to.'
sys.exit(1)
@ -144,36 +188,60 @@ def main(router, argv):
select = mitogen.master.Select(oneshot=False)
hosts = []
# For each hostname on the command line, create a Host instance, a Mitogen
# connection, a Receiver to accept messages from the host, and finally
# start child_main() on the host to pump messages into the receiver.
for hostname in argv:
print 'Starting on', hostname
host = Host()
host.name = hostname
if host.name == 'localhost':
host.context = router.local()
else:
host.context = router.ssh(hostname=host.name)
# A receiver wires up a handle (via Router.add_handler()) to an
# internal thread-safe queue object, which can be drained through calls
# to recv.get().
host.recv = mitogen.core.Receiver(router)
host.recv.host = host
select.add(host.recv)
call_recv = host.context.call_async(remote_main,
mitogen.context_id, host.recv.handle, delay)
# But we don't want to receive data from just one receiver, we want to
# receive data from many. In this case we can use a Select(). It knows
# how to efficiently sleep while waiting for the first message sent to
# many receivers.
select.add(host.recv)
# Adding call_recv to the select will cause CallError to be thrown by
# .get() if startup in the context fails, halt local_main() and cause
# the exception to be printed.
# The inverse of a Receiver is a Sender. Unlike receivers, senders are
# serializable, so we can call the .to_sender() helper method to create
# one equivalent to our host's receiver, and pass it directly to the
# host as a function parameter.
sender = host.recv.to_sender()
# Finally invoke the function in the remote target. Since child_main()
# is an infinite loop, using .call() would block the parent, since
# child_main() never returns. Instead use .call_async(), which returns
# another Receiver. We also want to wait for results from it --
# although child_main() never returns, if it crashes the exception will
# be delivered instead.
call_recv = host.context.call_async(child_main, sender, delay)
call_recv.host = host
# Adding call_recv to the select will cause mitogen.core.CallError to
# be thrown by .get() if startup of any context fails, causing halt of
# master_main(), and the exception to be printed.
select.add(call_recv)
hosts.append(host)
# Painter just wraps up all the prehistory ncurses code and keeps it out of
# master_main().
painter = Painter(hosts)
try:
try:
local_main(painter, router, select, delay)
master_main(painter, router, select, delay)
except KeyboardInterrupt:
# Shut down gracefully when the user presses CTRL+C.
pass
finally:
painter.close()
if __name__ == '__main__':
mitogen.utils.run_with_router(main, sys.argv[1:])

@ -1,6 +0,0 @@
---
- hosts: all
gather_facts: false
tasks:
- bin_bash_module:

@ -1,8 +0,0 @@
---
- hosts: all
gather_facts: false
tasks:
- name: "Run hostname"
command: hostname
with_sequence: start=1 end=100

@ -26,7 +26,6 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import Queue
import cPickle
import cStringIO
import collections
@ -41,6 +40,7 @@ import signal
import socket
import struct
import sys
import thread
import threading
import time
import traceback
@ -95,13 +95,18 @@ class LatchError(Error):
class CallError(Error):
def __init__(self, e):
s = '%s.%s: %s' % (type(e).__module__, type(e).__name__, e)
tb = sys.exc_info()[2]
if tb:
s += '\n'
s += ''.join(traceback.format_tb(tb))
Error.__init__(self, s)
def __init__(self, fmt=None, *args):
if not isinstance(fmt, Exception):
Error.__init__(self, fmt, *args)
else:
e = fmt
fmt = '%s.%s: %s' % (type(e).__module__, type(e).__name__, e)
args = ()
tb = sys.exc_info()[2]
if tb:
fmt += '\n'
fmt += ''.join(traceback.format_tb(tb))
Error.__init__(self, fmt)
def __reduce__(self):
return (_unpickle_call_error, (self[0],))
@ -152,6 +157,11 @@ def _unpickle_dead():
_DEAD = Dead()
def has_parent_authority(msg, _stream=None):
return (msg.auth_id == mitogen.context_id or
msg.auth_id in mitogen.parent_ids)
def listen(obj, name, func):
signals = vars(obj).setdefault('_signals', {})
signals.setdefault(name, []).append(func)
@ -205,11 +215,11 @@ def io_op(func, *args):
while True:
try:
return func(*args), False
except OSError, e:
except (select.error, OSError), e:
_vv and IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
if e.errno == errno.EINTR:
if e[0] == errno.EINTR:
continue
if e.errno in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
if e[0] in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
return None, True
raise
@ -294,6 +304,9 @@ class Message(object):
def _unpickle_context(self, context_id, name):
return _unpickle_context(self.router, context_id, name)
def _unpickle_sender(self, context_id, dst_handle):
return _unpickle_sender(self.router, context_id, dst_handle)
def _find_global(self, module, func):
"""Return the class implementing `module_name.class_name` or raise
`StreamError` if the module is not whitelisted."""
@ -302,6 +315,8 @@ class Message(object):
return _unpickle_call_error
elif func == '_unpickle_dead':
return _unpickle_dead
elif func == '_unpickle_sender':
return self._unpickle_sender
elif func == '_unpickle_context':
return self._unpickle_context
@ -361,6 +376,9 @@ class Sender(object):
def __repr__(self):
return 'Sender(%r, %r)' % (self.context, self.dst_handle)
def __reduce__(self):
return _unpickle_sender, (self.context.context_id, self.dst_handle)
def close(self):
"""Indicate this channel is closed to the remote side."""
_vv and IOLOG.debug('%r.close()', self)
@ -371,9 +389,9 @@ class Sender(object):
)
)
def put(self, data):
def send(self, data):
"""Send `data` to the remote."""
_vv and IOLOG.debug('%r.put(%r..)', self, data[:100])
_vv and IOLOG.debug('%r.send(%r..)', self, repr(data)[:100])
self.context.send(
Message.pickled(
data,
@ -382,20 +400,38 @@ class Sender(object):
)
def _unpickle_sender(router, context_id, dst_handle):
if not (isinstance(router, Router) and
isinstance(context_id, (int, long)) and context_id >= 0 and
isinstance(dst_handle, (int, long)) and dst_handle > 0):
raise TypeError('cannot unpickle Sender: bad input')
return Sender(Context(router, context_id), dst_handle)
class Receiver(object):
notify = None
raise_channelerror = True
def __init__(self, router, handle=None, persist=True, respondent=None):
def __init__(self, router, handle=None, persist=True,
respondent=None, policy=None):
self.router = router
self.handle = handle # Avoid __repr__ crash in add_handler()
self.handle = router.add_handler(self._on_receive, handle,
persist, respondent)
self.handle = router.add_handler(
fn=self._on_receive,
handle=handle,
policy=policy,
persist=persist,
respondent=respondent,
)
self._latch = Latch()
def __repr__(self):
return 'Receiver(%r, %r)' % (self.router, self.handle)
def to_sender(self):
context = Context(self.router, mitogen.context_id)
return Sender(context, self.handle)
def _on_receive(self, msg):
"""Callback from the Stream; appends data to the internal queue."""
_vv and IOLOG.debug('%r._on_receive(%r)', self, msg)
@ -416,13 +452,14 @@ class Receiver(object):
if msg == _DEAD:
raise ChannelError(ChannelError.local_msg)
msg.unpickle() # Cause .remote_msg to be thrown.
return msg
def __iter__(self):
while True:
try:
yield self.get()
msg = self.get()
msg.unpickle() # Cause .remote_msg to be thrown.
yield msg
except ChannelError:
return
@ -455,6 +492,7 @@ class Importer(object):
'fork',
'master',
'parent',
'service',
'ssh',
'sudo',
'utils',
@ -471,7 +509,11 @@ class Importer(object):
# Presence of an entry in this map indicates in-flight GET_MODULE.
self._callbacks = {}
router.add_handler(self._on_load_module, LOAD_MODULE)
router.add_handler(
fn=self._on_load_module,
handle=LOAD_MODULE,
policy=has_parent_authority,
)
self._cache = {}
if core_src:
self._cache['mitogen.core'] = (
@ -718,7 +760,7 @@ class BasicStream(object):
def on_disconnect(self, broker):
LOG.debug('%r.on_disconnect()', self)
broker.stop_receive(self)
broker.stop_transmit(self)
broker._stop_transmit(self)
if self.receive_side:
self.receive_side.close()
if self.transmit_side:
@ -786,6 +828,12 @@ class Stream(BasicStream):
self._input_buf[0][:self.HEADER_LEN],
)
if msg_len > self._router.max_message_size:
LOG.error('Maximum message size exceeded (got %d, max %d)',
msg_len, self._router.max_message_size)
self.on_disconnect(broker)
return False
total_len = msg_len + self.HEADER_LEN
if self._input_buf_len < total_len:
_vv and IOLOG.debug(
@ -829,15 +877,17 @@ class Stream(BasicStream):
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
if not self._output_buf:
broker.stop_transmit(self)
broker._stop_transmit(self)
def _send(self, msg):
_vv and IOLOG.debug('%r._send(%r)', self, msg)
pkt = struct.pack(self.HEADER_FMT, msg.dst_id, msg.src_id,
msg.auth_id, msg.handle, msg.reply_to or 0,
len(msg.data)) + msg.data
was_transmitting = len(self._output_buf)
self._output_buf.append(pkt)
self._router.broker.start_transmit(self)
if not was_transmitting:
self._router.broker._start_transmit(self)
def send(self, msg):
"""Send `data` to `handle`, and tell the broker we have output. May
@ -907,8 +957,10 @@ class Context(object):
def _unpickle_context(router, context_id, name):
if not (isinstance(router, Router) and
isinstance(context_id, (int, long)) and context_id > 0 and
isinstance(name, basestring) and len(name) < 100):
isinstance(context_id, (int, long)) and context_id >= 0 and (
(name is None) or
(isinstance(name, basestring) and len(name) < 100))
):
raise TypeError('cannot unpickle Context: bad input')
return router.context_class(router, context_id, name)
@ -1034,14 +1086,19 @@ class Latch(object):
class Waker(BasicStream):
"""
:py:class:`BasicStream` subclass implementing the
`UNIX self-pipe trick`_. Used internally to wake the IO multiplexer when
some of its state has been changed by another thread.
:py:class:`BasicStream` subclass implementing the `UNIX self-pipe trick`_.
Used to wake the multiplexer when another thread needs to modify its state
(via a cross-thread function call).
.. _UNIX self-pipe trick: https://cr.yp.to/docs/selfpipe.html
"""
broker_ident = None
def __init__(self, broker):
self._broker = broker
self._lock = threading.Lock()
self._deferred = []
rfd, wfd = os.pipe()
self.receive_side = Side(self, rfd)
self.transmit_side = Side(self, wfd)
@ -1053,24 +1110,63 @@ class Waker(BasicStream):
self.transmit_side.fd,
)
def on_receive(self, broker):
@property
def keep_alive(self):
"""
Read a byte from the self-pipe.
Prevent immediate Broker shutdown while deferred functions remain.
"""
self.receive_side.read(256)
self._lock.acquire()
try:
return len(self._deferred)
finally:
self._lock.release()
def wake(self):
def on_receive(self, broker):
"""
Write a byte to the self-pipe, causing the IO multiplexer to wake up.
Nothing is written if the current thread is the IO multiplexer thread.
Drain the pipe and fire callbacks. Reading multiple bytes is safe since
new bytes corresponding to future .defer() calls are written only after
.defer() takes _lock: either a byte we read corresponds to something
already on the queue by the time we take _lock, or a byte remains
buffered, causing another wake up, because it was written after we
released _lock.
"""
_vv and IOLOG.debug('%r.wake() [fd=%r]', self, self.transmit_side.fd)
if threading.currentThread() != self._broker._thread:
_vv and IOLOG.debug('%r.on_receive()', self)
self.receive_side.read(128)
self._lock.acquire()
try:
deferred = self._deferred
self._deferred = []
finally:
self._lock.release()
for func, args, kwargs in deferred:
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
func(*args, **kwargs)
except Exception:
LOG.exception('defer() crashed: %r(*%r, **%r)',
func, args, kwargs)
self._broker.shutdown()
def defer(self, func, *args, **kwargs):
if thread.get_ident() == self.broker_ident:
_vv and IOLOG.debug('%r.defer() [immediate]', self)
return func(*args, **kwargs)
_vv and IOLOG.debug('%r.defer() [fd=%r]', self, self.transmit_side.fd)
self._lock.acquire()
try:
self._deferred.append((func, args, kwargs))
finally:
self._lock.release()
# Wake the multiplexer by writing a byte. If the broker is in the midst
# of tearing itself down, the waker fd may already have been closed, so
# ignore EBADF here.
try:
self.transmit_side.write(' ')
except OSError, e:
if e[0] != errno.EBADF:
raise
class IoLogger(BasicStream):
@ -1119,9 +1215,11 @@ class IoLogger(BasicStream):
class Router(object):
context_class = Context
max_message_size = 128 * 1048576
def __init__(self, broker):
self.broker = broker
listen(broker, 'crash', self._cleanup_handlers)
listen(broker, 'shutdown', self.on_broker_shutdown)
# Here seems as good a place as any.
@ -1151,7 +1249,11 @@ class Router(object):
for context in self._context_by_id.itervalues():
context.on_shutdown(self.broker)
for _, func in self._handle_map.itervalues():
self._cleanup_handlers()
def _cleanup_handlers(self):
while self._handle_map:
_, (_, func, _) = self._handle_map.popitem()
func(_DEAD)
def register(self, context, stream):
@ -1161,18 +1263,22 @@ class Router(object):
self.broker.start_receive(stream)
listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream))
def add_handler(self, fn, handle=None, persist=True, respondent=None):
def add_handler(self, fn, handle=None, persist=True,
policy=None, respondent=None):
handle = handle or self._last_handle.next()
_vv and IOLOG.debug('%r.add_handler(%r, %r, %r)', self, fn, handle, persist)
self._handle_map[handle] = persist, fn
if respondent:
assert policy is None
def policy(msg, _stream):
return msg.src_id == respondent.context_id
def on_disconnect():
if handle in self._handle_map:
fn(_DEAD)
del self._handle_map[handle]
listen(respondent, 'disconnect', on_disconnect)
self._handle_map[handle] = persist, fn, policy
return handle
def on_shutdown(self, broker):
@ -1184,14 +1290,26 @@ class Router(object):
_v and LOG.debug('%r.on_shutdown(): killing %r: %r', self, handle, fn)
fn(_DEAD)
def _invoke(self, msg):
refused_msg = 'Refused by policy.'
def _invoke(self, msg, stream):
#IOLOG.debug('%r._invoke(%r)', self, msg)
try:
persist, fn = self._handle_map[msg.handle]
persist, fn, policy = self._handle_map[msg.handle]
except KeyError:
LOG.error('%r: invalid handle: %r', self, msg)
return
if policy and not policy(msg, stream):
LOG.error('%r: policy refused message: %r', self, msg)
if msg.reply_to:
self.route(Message.pickled(
CallError(self.refused_msg),
dst_id=msg.src_id,
handle=msg.reply_to
))
return
if not persist:
del self._handle_map[msg.handle]
@ -1202,18 +1320,32 @@ class Router(object):
def _async_route(self, msg, stream=None):
_vv and IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream)
if len(msg.data) > self.max_message_size:
LOG.error('message too large (max %d bytes): %r',
self.max_message_size, msg)
return
# Perform source verification.
if stream is not None:
expected_stream = self._stream_by_id.get(msg.auth_id,
self._stream_by_id.get(mitogen.parent_id))
if stream != expected_stream:
LOG.error('%r: bad source: got auth ID %r from %r, should be from %r',
self, msg, stream, expected_stream)
if stream:
parent = self._stream_by_id.get(mitogen.parent_id)
expect = self._stream_by_id.get(msg.auth_id, parent)
if stream != expect:
LOG.error('%r: bad auth_id: got %r via %r, not %r: %r',
self, msg.auth_id, stream, expect, msg)
return
if msg.src_id != msg.auth_id:
expect = self._stream_by_id.get(msg.src_id, parent)
if stream != expect:
LOG.error('%r: bad src_id: got %r via %r, not %r: %r',
self, msg.src_id, stream, expect, msg)
return
if stream.auth_id is not None:
msg.auth_id = stream.auth_id
if msg.dst_id == mitogen.context_id:
return self._invoke(msg)
return self._invoke(msg, stream)
stream = self._stream_by_id.get(msg.dst_id)
if stream is None:
@ -1224,7 +1356,7 @@ class Router(object):
self, msg, mitogen.context_id)
return
stream.send(msg)
stream._send(msg)
def route(self, msg):
self.broker.defer(self._async_route, msg)
@ -1237,24 +1369,17 @@ class Broker(object):
def __init__(self):
self._alive = True
self._queue = Queue.Queue()
self._readers = []
self._writers = []
self._waker = Waker(self)
self.start_receive(self._waker)
self.defer = self._waker.defer
self._readers = [self._waker.receive_side]
self._writers = []
self._thread = threading.Thread(
target=_profile_hook,
args=('broker', self._broker_main),
name='mitogen-broker'
)
self._thread.start()
def defer(self, func, *args, **kwargs):
if threading.currentThread() == self._thread:
func(*args, **kwargs)
else:
self._queue.put((func, args, kwargs))
self._waker.wake()
self._waker.broker_ident = self._thread.ident
def _list_discard(self, lst, value):
try:
@ -1275,14 +1400,14 @@ class Broker(object):
IOLOG.debug('%r.stop_receive(%r)', self, stream)
self.defer(self._list_discard, self._readers, stream.receive_side)
def start_transmit(self, stream):
IOLOG.debug('%r.start_transmit(%r)', self, stream)
def _start_transmit(self, stream):
IOLOG.debug('%r._start_transmit(%r)', self, stream)
assert stream.transmit_side and stream.transmit_side.fd is not None
self.defer(self._list_add, self._writers, stream.transmit_side)
self._list_add(self._writers, stream.transmit_side)
def stop_transmit(self, stream):
IOLOG.debug('%r.stop_transmit(%r)', self, stream)
self.defer(self._list_discard, self._writers, stream.transmit_side)
def _stop_transmit(self, stream):
IOLOG.debug('%r._stop_transmit(%r)', self, stream)
self._list_discard(self._writers, stream.transmit_side)
def _call(self, stream, func):
try:
@ -1291,19 +1416,8 @@ class Broker(object):
LOG.exception('%r crashed', stream)
stream.on_disconnect(self)
def _run_defer(self):
while not self._queue.empty():
func, args, kwargs = self._queue.get()
try:
func(*args, **kwargs)
except Exception:
LOG.exception('defer() crashed: %r(*%r, **%r)',
func, args, kwargs)
self.shutdown()
def _loop_once(self, timeout=None):
_vv and IOLOG.debug('%r._loop_once(%r)', self, timeout)
self._run_defer()
#IOLOG.debug('readers = %r', self._readers)
#IOLOG.debug('writers = %r', self._writers)
@ -1322,15 +1436,13 @@ class Broker(object):
self._call(side.stream, side.stream.on_transmit)
def keep_alive(self):
return (sum((side.keep_alive for side in self._readers), 0) +
(not self._queue.empty()))
return sum((side.keep_alive for side in self._readers), 0)
def _broker_main(self):
try:
while self._alive:
self._loop_once()
self._run_defer()
fire(self, 'shutdown')
for side in set(self._readers).union(self._writers):
@ -1351,13 +1463,15 @@ class Broker(object):
side.stream.on_disconnect(self)
except Exception:
LOG.exception('_broker_main() crashed')
fire(self, 'crash')
fire(self, 'exit')
def shutdown(self):
_v and LOG.debug('%r.shutdown()', self)
self._alive = False
self._waker.wake()
def _shutdown():
self._alive = False
self.defer(_shutdown)
def join(self):
self._thread.join()
@ -1376,29 +1490,35 @@ class ExternalContext(object):
def _on_shutdown_msg(self, msg):
_v and LOG.debug('_on_shutdown_msg(%r)', msg)
if msg != _DEAD and msg.auth_id not in mitogen.parent_ids:
LOG.warning('Ignoring SHUTDOWN from non-parent: %r', msg)
return
self.broker.shutdown()
if msg != _DEAD:
self.broker.shutdown()
def _on_parent_disconnect(self):
_v and LOG.debug('%r: parent stream is gone, dying.', self)
self.broker.shutdown()
def _setup_master(self, profiling, parent_id, context_id, in_fd, out_fd):
def _setup_master(self, max_message_size, profiling, parent_id,
context_id, in_fd, out_fd):
Router.max_message_size = max_message_size
self.profiling = profiling
if profiling:
enable_profiling()
self.broker = Broker()
self.router = Router(self.broker)
self.router.add_handler(self._on_shutdown_msg, SHUTDOWN)
self.router.add_handler(
fn=self._on_shutdown_msg,
handle=SHUTDOWN,
policy=has_parent_authority,
)
self.master = Context(self.router, 0, 'master')
if parent_id == 0:
self.parent = self.master
else:
self.parent = Context(self.router, parent_id, 'parent')
self.channel = Receiver(self.router, CALL_FUNCTION)
self.channel = Receiver(router=self.router,
handle=CALL_FUNCTION,
policy=has_parent_authority)
self.stream = Stream(self.router, parent_id)
self.stream.name = 'parent'
self.stream.accept(in_fd, out_fd)
@ -1494,8 +1614,6 @@ class ExternalContext(object):
def _dispatch_one(self, msg):
data = msg.unpickle(throw=False)
_v and LOG.debug('_dispatch_calls(%r)', data)
if msg.auth_id not in mitogen.parent_ids:
LOG.warning('CALL_FUNCTION from non-parent %r', msg.auth_id)
modname, klass, func, args, kwargs = data
obj = __import__(modname, {}, {}, [''])
@ -1518,9 +1636,11 @@ class ExternalContext(object):
self.dispatch_stopped = True
def main(self, parent_ids, context_id, debug, profiling, log_level,
in_fd=100, out_fd=1, core_src_fd=101, setup_stdio=True,
setup_package=True, importer=None, whitelist=(), blacklist=()):
self._setup_master(profiling, parent_ids[0], context_id, in_fd, out_fd)
max_message_size, in_fd=100, out_fd=1, core_src_fd=101,
setup_stdio=True, setup_package=True, importer=None,
whitelist=(), blacklist=()):
self._setup_master(max_message_size, profiling, parent_ids[0],
context_id, in_fd, out_fd)
try:
try:
self._setup_logging(debug, log_level)

@ -62,14 +62,14 @@ class IoPump(mitogen.core.BasicStream):
def write(self, s):
self._output_buf += s
self._broker.start_transmit(self)
self._broker._start_transmit(self)
def close(self):
self._closed = True
# If local process hasn't exitted yet, ensure its write buffer is
# drained before lazily triggering disconnect in on_transmit.
if self.transmit_side.fd is not None:
self._broker.start_transmit(self)
self._broker._start_transmit(self)
def on_shutdown(self, broker):
self.close()
@ -83,7 +83,7 @@ class IoPump(mitogen.core.BasicStream):
self._output_buf = self._output_buf[written:]
if not self._output_buf:
broker.stop_transmit(self)
broker._stop_transmit(self)
if self._closed:
self.on_disconnect(broker)
@ -343,14 +343,15 @@ def run(dest, router, args, deadline=None, econtext=None):
fp.write(inspect.getsource(mitogen.core))
fp.write('\n')
fp.write('ExternalContext().main(**%r)\n' % ({
'parent_ids': parent_ids,
'context_id': context_id,
'core_src_fd': None,
'debug': getattr(router, 'debug', False),
'profiling': getattr(router, 'profiling', False),
'log_level': mitogen.parent.get_log_level(),
'in_fd': sock2.fileno(),
'log_level': mitogen.parent.get_log_level(),
'max_message_size': router.max_message_size,
'out_fd': sock2.fileno(),
'core_src_fd': None,
'parent_ids': parent_ids,
'profiling': getattr(router, 'profiling', False),
'setup_stdio': False,
},))
finally:

@ -31,6 +31,7 @@ import os
import random
import sys
import threading
import traceback
import mitogen.core
import mitogen.parent
@ -65,6 +66,20 @@ def break_logging_locks():
handler.createLock()
def handle_child_crash():
"""
Respond to _child_main() crashing by ensuring the relevant exception is
logged to /dev/tty.
"""
tty = open('/dev/tty', 'wb')
tty.write('\n\nFORKED CHILD PID %d CRASHED\n%s\n\n' % (
os.getpid(),
traceback.format_exc(),
))
tty.close()
os._exit(1)
class Stream(mitogen.parent.Stream):
#: Reference to the importer, if any, recovered from the parent.
importer = None
@ -72,9 +87,11 @@ class Stream(mitogen.parent.Stream):
#: User-supplied function for cleaning up child process state.
on_fork = None
def construct(self, old_router, on_fork=None, debug=False, profiling=False):
def construct(self, old_router, max_message_size, on_fork=None,
debug=False, profiling=False):
# fork method only supports a tiny subset of options.
super(Stream, self).construct(debug=debug, profiling=profiling)
super(Stream, self).construct(max_message_size=max_message_size,
debug=debug, profiling=profiling)
self.on_fork = on_fork
responder = getattr(old_router, 'responder', None)
@ -94,7 +111,13 @@ class Stream(mitogen.parent.Stream):
return self.pid, fd
else:
parentfp.close()
self._wrap_child_main(childfp)
def _wrap_child_main(self, childfp):
try:
self._child_main(childfp)
except BaseException, e:
handle_child_crash()
def _child_main(self, childfp):
mitogen.core.Latch._on_fork()
@ -113,17 +136,24 @@ class Stream(mitogen.parent.Stream):
# avoid ExternalContext.main() accidentally allocating new files over
# the standard handles.
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 2)
# Avoid corrupting the stream on fork crash by dupping /dev/null over
# stderr. Instead, handle_child_crash() uses /dev/tty to log errors.
devnull = os.open('/dev/null', os.O_WRONLY)
if devnull != 2:
os.dup2(devnull, 2)
os.close(devnull)
childfp.close()
kwargs = self.get_main_kwargs()
kwargs['core_src_fd'] = None
kwargs['importer'] = self.importer
kwargs['setup_package'] = False
mitogen.core.ExternalContext().main(**kwargs)
# Don't trigger atexit handlers, they were copied from the parent.
os._exit(0)
try:
mitogen.core.ExternalContext().main(**kwargs)
finally:
# Don't trigger atexit handlers, they were copied from the parent.
os._exit(0)
def _connect_bootstrap(self):
# None required.

@ -288,7 +288,10 @@ class LogForwarder(object):
def __init__(self, router):
self._router = router
self._cache = {}
router.add_handler(self._on_forward_log, mitogen.core.FORWARD_LOG)
router.add_handler(
fn=self._on_forward_log,
handle=mitogen.core.FORWARD_LOG,
)
def _on_forward_log(self, msg):
if msg == mitogen.core._DEAD:
@ -524,7 +527,10 @@ class ModuleResponder(object):
self._cache = {} # fullname -> pickled
self.blacklist = []
self.whitelist = ['']
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
router.add_handler(
fn=self._on_get_module,
handle=mitogen.core.GET_MODULE,
)
def __repr__(self):
return 'ModuleResponder(%r)' % (self._router,)
@ -646,9 +652,11 @@ class Router(mitogen.parent.Router):
debug = False
profiling = False
def __init__(self, broker=None):
def __init__(self, broker=None, max_message_size=None):
if broker is None:
broker = self.broker_class()
if max_message_size:
self.max_message_size = max_message_size
super(Router, self).__init__(broker)
self.upgrade()
@ -682,7 +690,10 @@ class IdAllocator(object):
self.router = router
self.next_id = 1
self.lock = threading.Lock()
router.add_handler(self.on_allocate_id, mitogen.core.ALLOCATE_ID)
router.add_handler(
fn=self.on_allocate_id,
handle=mitogen.core.ALLOCATE_ID,
)
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)

@ -76,6 +76,14 @@ def get_log_level():
return (LOG.level or logging.getLogger().level or logging.INFO)
def is_immediate_child(msg, stream):
"""
Handler policy that requires messages to arrive only from immediately
connected children.
"""
return msg.src_id == stream.remote_id
def minimize_source(source):
"""Remove most comments and docstrings from Python source code.
"""
@ -383,12 +391,24 @@ METHOD_NAMES = {
@mitogen.core.takes_econtext
def _proxy_connect(name, method_name, kwargs, econtext):
mitogen.parent.upgrade_router(econtext)
context = econtext.router._connect(
klass=METHOD_NAMES[method_name](),
name=name,
**kwargs
)
return context.context_id, context.name
try:
context = econtext.router._connect(
klass=METHOD_NAMES[method_name](),
name=name,
**kwargs
)
except mitogen.core.StreamError, e:
return {
'id': None,
'name': None,
'msg': str(e),
}
return {
'id': context.context_id,
'name': context.name,
'msg': None,
}
class Stream(mitogen.core.Stream):
@ -414,6 +434,10 @@ class Stream(mitogen.core.Stream):
#: Set to the child's PID by connect().
pid = None
#: Passed via Router wrapper methods, must eventually be passed to
#: ExternalContext.main().
max_message_size = None
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
@ -421,12 +445,13 @@ class Stream(mitogen.core.Stream):
#: during disconnection.
self.routes = set([self.remote_id])
def construct(self, remote_name=None, python_path=None, debug=False,
connect_timeout=None, profiling=False,
def construct(self, max_message_size, remote_name=None, python_path=None,
debug=False, connect_timeout=None, profiling=False,
old_router=None, **kwargs):
"""Get the named context running on the local machine, creating it if
it does not exist."""
super(Stream, self).construct(**kwargs)
self.max_message_size = max_message_size
if python_path:
self.python_path = python_path
if sys.platform == 'darwin' and self.python_path == '/usr/bin/python':
@ -444,6 +469,7 @@ class Stream(mitogen.core.Stream):
self.remote_name = remote_name
self.debug = debug
self.profiling = profiling
self.max_message_size = max_message_size
self.connect_deadline = time.time() + self.connect_timeout
def on_shutdown(self, broker):
@ -518,6 +544,7 @@ class Stream(mitogen.core.Stream):
]
def get_main_kwargs(self):
assert self.max_message_size is not None
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
return {
@ -528,6 +555,7 @@ class Stream(mitogen.core.Stream):
'log_level': get_log_level(),
'whitelist': self._router.get_module_whitelist(),
'blacklist': self._router.get_module_blacklist(),
'max_message_size': self.max_message_size,
}
def get_preamble(self):
@ -623,11 +651,13 @@ class RouteMonitor(object):
fn=self._on_add_route,
handle=mitogen.core.ADD_ROUTE,
persist=True,
policy=is_immediate_child,
)
self.router.add_handler(
fn=self._on_del_route,
handle=mitogen.core.DEL_ROUTE,
persist=True,
policy=is_immediate_child,
)
def propagate(self, handle, target_id, name=None):
@ -780,7 +810,9 @@ class Router(mitogen.core.Router):
def _connect(self, klass, name=None, **kwargs):
context_id = self.allocate_id()
context = self.context_class(self, context_id)
stream = klass(self, context_id, old_router=self, **kwargs)
kwargs['old_router'] = self
kwargs['max_message_size'] = self.max_message_size
stream = klass(self, context_id, **kwargs)
if name is not None:
stream.name = name
stream.connect()
@ -800,14 +832,16 @@ class Router(mitogen.core.Router):
return self._connect(klass, name=name, **kwargs)
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id, name = via_context.call(_proxy_connect,
resp = via_context.call(_proxy_connect,
name=name,
method_name=method_name,
kwargs=kwargs
)
name = '%s.%s' % (via_context.name, name)
if resp['msg'] is not None:
raise mitogen.core.StreamError(resp['msg'])
context = self.context_class(self, context_id, name=name)
name = '%s.%s' % (via_context.name, resp['name'])
context = self.context_class(self, resp['id'], name=name)
context.via = via_context
self._context_by_id[context.context_id] = context
return context
@ -862,7 +896,12 @@ class ModuleForwarder(object):
self.router = router
self.parent_context = parent_context
self.importer = importer
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
router.add_handler(
fn=self._on_get_module,
handle=mitogen.core.GET_MODULE,
persist=True,
policy=is_immediate_child,
)
def __repr__(self):
return 'ModuleForwarder(%r)' % (self.router,)

@ -82,6 +82,12 @@ class Service(object):
self.handle = self.recv.handle
self.running = True
def __repr__(self):
return '%s.%s()' % (
self.__class__.__module__,
self.__class__.__name__,
)
def validate_args(self, args):
return (
isinstance(args, dict) and
@ -108,6 +114,7 @@ class Service(object):
isinstance(args, mitogen.core.CallError) or
not self.validate_args(args)):
LOG.warning('Received junk message: %r', args)
msg.reply(mitogen.core.CallError('Received junk message'))
return
try:

@ -39,11 +39,11 @@ import mitogen.parent
LOG = logging.getLogger('mitogen')
PASSWORD_PROMPT = 'password'
PASSWORD_PROMPT = 'password:'
PERMDENIED_PROMPT = 'permission denied'
class PasswordError(mitogen.core.Error):
class PasswordError(mitogen.core.StreamError):
pass
@ -61,7 +61,8 @@ class Stream(mitogen.parent.Stream):
def construct(self, hostname, username=None, ssh_path=None, port=None,
check_host_keys=True, password=None, identity_file=None,
compression=True, ssh_args=None, **kwargs):
compression=True, ssh_args=None, keepalive_enabled=True,
keepalive_count=3, keepalive_interval=15, **kwargs):
super(Stream, self).construct(**kwargs)
self.hostname = hostname
self.username = username
@ -70,6 +71,9 @@ class Stream(mitogen.parent.Stream):
self.password = password
self.identity_file = identity_file
self.compression = compression
self.keepalive_enabled = keepalive_enabled
self.keepalive_count = keepalive_count
self.keepalive_interval = keepalive_interval
if ssh_path:
self.ssh_path = ssh_path
if ssh_args:
@ -89,6 +93,11 @@ class Stream(mitogen.parent.Stream):
bits += ['-i', self.identity_file]
if self.compression:
bits += ['-o', 'Compression yes']
if self.keepalive_enabled:
bits += [
'-o', 'ServerAliveInterval %s' % (self.keepalive_interval,),
'-o', 'ServerAliveCountMax %s' % (self.keepalive_count,),
]
if not self.check_host_keys:
bits += [
'-o', 'StrictHostKeyChecking no',

@ -98,7 +98,7 @@ def parse_sudo_flags(args):
return opts
class PasswordError(mitogen.core.Error):
class PasswordError(mitogen.core.StreamError):
pass

@ -14,7 +14,7 @@ import mitogen.sudo
router = mitogen.master.Router()
context = mitogen.parent.Context(router, 0)
stream = mitogen.ssh.Stream(router, 0, hostname='foo')
stream = mitogen.ssh.Stream(router, 0, max_message_size=0, hostname='foo')
print 'SSH command size: %s' % (len(' '.join(stream.get_boot_command())),)
print 'Preamble size: %s (%.2fKiB)' % (

@ -0,0 +1,14 @@
#/bin/sh
set -o errexit
set -o nounset
set -o pipefail
UNIT2="$(which unit2)"
coverage erase
coverage run "${UNIT2}" discover \
--start-directory "tests" \
--pattern '*_test.py' \
"$@"
coverage html
echo coverage report is at "file://$(pwd)/htmlcov/index.html"

@ -1,3 +1,10 @@
[coverage:run]
branch = true
source =
mitogen
omit =
mitogen/compat/*
[flake8]
ignore = E402,E128,W503
exclude = mitogen/compat

@ -1,63 +0,0 @@
#!/bin/bash
timeout()
{
python -c '
import subprocess
import sys
import time
deadline = time.time() + float(sys.argv[1])
proc = subprocess.Popen(sys.argv[2:])
while time.time() < deadline and proc.poll() is None:
time.sleep(1.0)
if proc.poll() is not None:
sys.exit(proc.returncode)
proc.terminate()
print
print >> sys.stderr, "Timeout! Command was:", sys.argv[2:]
print
sys.exit(1)
' "$@"
}
trap 'sigint' INT
sigint()
{
echo "SIGINT received, stopping.."
exit 1
}
run_test()
{
echo "Running $1.."
timeout 10 python $1 || fail=$?
}
run_test tests/ansible_helpers_test.py
run_test tests/call_function_test.py
run_test tests/channel_test.py
run_test tests/fakessh_test.py
run_test tests/first_stage_test.py
run_test tests/fork_test.py
run_test tests/id_allocation_test.py
run_test tests/importer_test.py
run_test tests/latch_test.py
run_test tests/local_test.py
run_test tests/master_test.py
run_test tests/minimize_source_test.py
run_test tests/module_finder_test.py
run_test tests/nested_test.py
run_test tests/parent_test.py
run_test tests/responder_test.py
run_test tests/router_test.py
run_test tests/select_test.py
run_test tests/ssh_test.py
run_test tests/testlib.py
run_test tests/utils_test.py
if [ "$fail" ]; then
echo "AT LEAST ONE TEST FAILED" >&2
exit 1
fi

@ -27,4 +27,4 @@ and run the tests there.
1. Build the virtual environment ``virtualenv ../venv``
1. Enable the virtual environment we just built ``source ../venv/bin/activate``
1. Install Mitogen in pip editable mode ``pip install -e .``
1. Run ``test.sh``
1. Run ``test``

@ -0,0 +1,2 @@
lib/modules/custom_binary_producing_junk
lib/modules/custom_binary_producing_json

@ -0,0 +1,4 @@
all: \
lib/modules/custom_binary_producing_junk \
lib/modules/custom_binary_producing_json

@ -0,0 +1,22 @@
# ``tests/ansible`` Directory
This is an an organically growing collection of integration and regression
tests used for development and end-user bug reports.
It will be tidied up over time, meanwhile, the playbooks here are a useful
demonstrator for what does and doesn't work.
## ``run_ansible_playbook.sh``
This is necessary to set some environment variables used by future tests, as
there appears to be no better way to inject them into the top-level process
environment before the Mitogen connection process forks.
## Running Everything
```
ANSIBLE_STRATEGY=mitogen_linear ./run_ansible_playbook.sh all.yml
```

@ -0,0 +1,3 @@
- import_playbook: regression/all.yml
- import_playbook: integration/all.yml

@ -1,11 +1,15 @@
[defaults]
inventory = hosts
gathering = explicit
strategy_plugins = ../../ansible_mitogen/plugins/strategy
strategy = mitogen
library = modules
action_plugins = lib/action
library = lib/modules
retry_files_enabled = False
forks = 50
# Required by integration/runner__remote_tmp.yml
remote_tmp = ~/.ansible/mitogen-tests/
[ssh_connection]
ssh_args = -o ForwardAgent=yes -o ControlMaster=auto -o ControlPersist=60s
pipelining = True

@ -0,0 +1,62 @@
#!/usr/bin/env python
import difflib
import logging
import re
import subprocess
import tempfile
LOG = logging.getLogger(__name__)
suffixes = [
'-m custom_bash_old_style_module',
'-m custom_bash_want_json_module',
'-m custom_binary_producing_json',
'-m custom_binary_producing_junk',
'-m custom_binary_single_null',
'-m custom_python_json_args_module',
'-m custom_python_new_style_module',
'-m custom_python_want_json_module',
'-m setup',
]
fixups = [
('Shared connection to localhost closed\\.(\r\n)?', ''), # TODO
]
def fixup(s):
for regex, to in fixups:
s = re.sub(regex, to, s, re.DOTALL|re.M)
return s
def run(s):
LOG.debug('running: %r', s)
with tempfile.NamedTemporaryFile() as fp:
# https://www.systutorials.com/docs/linux/man/1-ansible-playbook/#lbAG
returncode = subprocess.call(s, stdout=fp, stderr=fp, shell=True)
fp.write('\nReturn code: %s\n' % (returncode,))
fp.seek(0)
return fp.read()
logging.basicConfig(level=logging.DEBUG)
for suffix in suffixes:
ansible = run('ansible localhost %s' % (suffix,))
mitogen = run('ANSIBLE_STRATEGY=mitogen ansible localhost %s' % (suffix,))
diff = list(difflib.unified_diff(
a=fixup(ansible).splitlines(),
b=fixup(mitogen).splitlines(),
fromfile='ansible-output.txt',
tofile='mitogen-output.txt',
))
if diff:
print '++ differ! suffix: %r' % (suffix,)
for line in diff:
print line
print
print

@ -0,0 +1,4 @@
- import_playbook: remote_file_exists.yml
- import_playbook: low_level_execute_command.yml
- import_playbook: make_tmp_path.yml
- import_playbook: transfer_data.yml

@ -1,10 +1,11 @@
---
# Verify the behaviour of _low_level_execute_command().
- hosts: all
gather_facts: false
any_errors_fatal: true
tasks:
- name: integration/action__low_level_execute_command.yml
assert:
that: true
# "echo -en" to test we actually hit bash shell too.
- name: Run raw module without sudo
@ -27,6 +28,7 @@
- debug: msg={{raw}}
- name: Verify raw module output.
assert:
that:
- 'raw.rc == 0'
- 'raw.stdout_lines == ["root"]'
that: |
raw.rc == 0 and
raw.stdout == "root\r\n" and
raw.stdout_lines == ["root"]

@ -0,0 +1,25 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/action/make_tmp_path.yml
assert:
that: true
- action_passthrough:
method: _make_tmp_path
register: out
- assert:
that: out.result.startswith(ansible_remote_tmp|expanduser)
- stat:
path: "{{out.result}}"
register: st
- assert:
that: st.stat.exists and st.stat.isdir and st.stat.mode == "0700"
- file:
path: "{{out.result}}"
state: absent

@ -0,0 +1,38 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/action/remote_file_exists.yml
assert:
that: true
- file:
path: /tmp/does-not-exist
state: absent
- action_passthrough:
method: _remote_file_exists
args: ['/tmp/does-not-exist']
register: out
- assert:
that: out.result == False
# ---
- copy:
dest: /tmp/does-exist
content: "I think, therefore I am"
- action_passthrough:
method: _remote_file_exists
args: ['/tmp/does-exist']
register: out
- assert:
that: out.result == True
- file:
path: /tmp/does-exist
state: absent

@ -0,0 +1,47 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/action/transfer_data.yml
file:
path: /tmp/transfer-data
state: absent
# Ensure it JSON-encodes dicts.
- action_passthrough:
method: _transfer_data
kwargs:
remote_path: /tmp/transfer-data
data: {
"I am JSON": true
}
- slurp:
src: /tmp/transfer-data
register: out
- assert:
that: |
out.content.decode('base64') == '{"I am JSON": true}'
# Ensure it handles strings.
- action_passthrough:
method: _transfer_data
kwargs:
remote_path: /tmp/transfer-data
data: "I am text."
- slurp:
src: /tmp/transfer-data
register: out
- debug: msg={{out}}
- assert:
that:
out.content.decode('base64') == 'I am text.'
- file:
path: /tmp/transfer-data
state: absent

@ -0,0 +1,9 @@
#
# This playbook imports all tests that are known to work at present.
#
- import_playbook: action/all.yml
- import_playbook: connection_loader/all.yml
- import_playbook: runner/all.yml
- import_playbook: playbook_semantics/all.yml

@ -1,9 +1,6 @@
---
- hosts: all
gather_facts: false
any_errors_fatal: true
tasks:
- name: simulate long running op (3 sec), wait for up to 5 sec, poll every 1 sec
command: /bin/sleep 2
async: 4

@ -0,0 +1,3 @@
- import_playbook: local_blemished.yml
- import_playbook: paramiko_unblemished.yml
- import_playbook: ssh_blemished.yml

@ -0,0 +1,14 @@
# Ensure 'local' connections are grabbed.
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/connection_loader__local_blemished.yml
determine_strategy:
- custom_python_detect_environment:
connection: local
register: out
- assert:
that: out.mitogen_loaded or not is_mitogen

@ -0,0 +1,12 @@
# Ensure paramiko connections aren't grabbed.
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/connection_loader__paramiko_unblemished.yml
custom_python_detect_environment:
connection: paramiko
register: out
- assert:
that: not out.mitogen_loaded

@ -0,0 +1,14 @@
# Ensure 'ssh' connections are grabbed.
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/connection_loader__ssh_blemished.yml
determine_strategy:
- custom_python_detect_environment:
connection: ssh
register: out
- assert:
that: out.mitogen_loaded or not is_mitogen

@ -0,0 +1,3 @@
- import_playbook: become_flags.yml
- import_playbook: delegate_to.yml
- import_playbook: environment.yml

@ -0,0 +1,32 @@
#
# Test sudo_flags respects -E.
#
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/playbook_semantics/become_flags.yml
assert:
that: true
- name: "without -E"
become: true
shell: "echo $I_WAS_PRESERVED"
register: out
- assert:
that: "out.stdout == ''"
- hosts: all
any_errors_fatal: true
become_flags: -E
tasks:
- name: "with -E"
become: true
shell: "echo $I_WAS_PRESERVED"
register: out2
environment:
I_WAS_PRESERVED: 2
- assert:
that: "out2.stdout == '2'"

@ -1,9 +1,6 @@
---
- hosts: all
gather_facts: false
any_errors_fatal: true
tasks:
#
# delegate_to, no sudo
#

@ -1,10 +1,8 @@
---
# Ensure environment: is preserved during call.
- hosts: all
gather_facts: false
any_errors_fatal: true
tasks:
- shell: echo $SOME_ENV
environment:
SOME_ENV: 123
@ -14,4 +12,3 @@
- assert:
that: "result.stdout == '123'"

@ -0,0 +1,12 @@
- import_playbook: builtin_command_module.yml
- import_playbook: custom_bash_old_style_module.yml
- import_playbook: custom_bash_want_json_module.yml
- import_playbook: custom_binary_producing_json.yml
- import_playbook: custom_binary_producing_junk.yml
- import_playbook: custom_binary_single_null.yml
- import_playbook: custom_perl_json_args_module.yml
- import_playbook: custom_perl_want_json_module.yml
- import_playbook: custom_python_json_args_module.yml
- import_playbook: custom_python_new_style_module.yml
- import_playbook: custom_python_want_json_module.yml
- import_playbook: remote_tmp.yml

@ -0,0 +1,17 @@
- hosts: all
any_errors_fatal: true
gather_facts: true
tasks:
- name: integration/runner__builtin_command_module.yml
command: hostname
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
out.changed and
out.results[0].changed and
out.results[0].cmd == ['hostname'] and
out.results[0].item == '1' and
out.results[0].rc == 0 and
(out.results[0].stdout == ansible_nodename)

@ -0,0 +1,14 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_bash_old_style_module.yml
custom_bash_old_style_module:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
(not out.changed) and
(not out.results[0].changed) and
out.results[0].msg == 'Here is my input'

@ -0,0 +1,14 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_bash_want_json_module.yml
custom_bash_want_json_module:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
(not out.changed) and
(not out.results[0].changed) and
out.results[0].msg == 'Here is my input'

@ -0,0 +1,14 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_binary_producing_json.yml
custom_binary_producing_json:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
out.changed and
out.results[0].changed and
out.results[0].msg == 'Hello, world.'

@ -0,0 +1,19 @@
- hosts: all
tasks:
- name: integration/runner__custom_binary_producing_junk.yml
custom_binary_producing_junk:
foo: true
with_sequence: start=1 end={{end|default(1)}}
ignore_errors: true
register: out
- hosts: all
any_errors_fatal: true
tasks:
- assert:
that: |
out.failed and
out.results[0].failed and
out.results[0].msg == 'MODULE FAILURE' and
out.results[0].rc == 0

@ -0,0 +1,24 @@
- hosts: all
tasks:
- name: integration/runner__custom_binary_single_null.yml
custom_binary_single_null:
foo: true
with_sequence: start=1 end={{end|default(1)}}
ignore_errors: true
register: out
- hosts: all
any_errors_fatal: true
tasks:
- assert:
that: |
out.failed and
out.results[0].failed and
out.results[0].msg == 'MODULE FAILURE' and
out.results[0].module_stdout.startswith('/bin/sh: ') and
out.results[0].module_stdout.endswith('/custom_binary_single_null: cannot execute binary file\r\n')
# Can't test this: Mitogen returns 126, 2.5.x returns 126, 2.4.x discarded the
# return value and always returned 0.
# out.results[0].rc == 126

@ -0,0 +1,15 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_perl_json_args_module.yml
custom_perl_json_args_module:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
(not out.changed) and
(not out.results[0].changed) and
out.results[0].input[0].foo and
out.results[0].message == 'I am a perl script! Here is my input.'

@ -0,0 +1,15 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_perl_want_json_module.yml
custom_perl_want_json_module:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
(not out.changed) and
(not out.results[0].changed) and
out.results[0].input[0].foo and
out.results[0].message == 'I am a want JSON perl script! Here is my input.'

@ -0,0 +1,15 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_python_json_args_module.yml
custom_python_json_args_module:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
(not out.changed) and
(not out.results[0].changed) and
out.results[0].input[0].foo and
out.results[0].msg == 'Here is my input'

@ -0,0 +1,15 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_python_new_style_module.yml
custom_python_new_style_module:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
(not out.changed) and
(not out.results[0].changed) and
out.results[0].input[0].ANSIBLE_MODULE_ARGS.foo and
out.results[0].msg == 'Here is my input'

@ -0,0 +1,15 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: integration/runner__custom_python_want_json_module.yml
custom_python_want_json_module:
foo: true
with_sequence: start=1 end={{end|default(1)}}
register: out
- assert:
that: |
(not out.changed) and
(not out.results[0].changed) and
out.results[0].input[0].foo and
out.results[0].msg == 'Here is my input'

@ -0,0 +1,18 @@
#
# The ansible.cfg remote_tmp setting should be copied to the target and used
# when generating temporary paths created by the runner.py code executing
# remotely.
#
- hosts: all
any_errors_fatal: true
gather_facts: true
tasks:
- name: integration/runner__remote_tmp.yml
bash_return_paths:
register: output
- assert:
that: output.argv0.startswith('%s/.ansible/mitogen-tests/' % ansible_user_dir)
- assert:
that: output.argv1.startswith('%s/.ansible/mitogen-tests/' % ansible_user_dir)

@ -0,0 +1,28 @@
import traceback
import sys
from ansible.plugins.strategy import StrategyBase
from ansible.plugins.action import ActionBase
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
try:
method = getattr(self, self._task.args['method'])
args = tuple(self._task.args.get('args', ()))
kwargs = self._task.args.get('kwargs', {})
return {
'changed': False,
'failed': False,
'result': method(*args, **kwargs)
}
except Exception as e:
traceback.print_exc()
return {
'changed': False,
'failed': True,
'msg': str(e),
'result': e,
}

@ -0,0 +1,25 @@
import sys
from ansible.plugins.strategy import StrategyBase
from ansible.plugins.action import ActionBase
class ActionModule(ActionBase):
def _get_strategy_name(self):
frame = sys._getframe()
while frame:
st = frame.f_locals.get('self')
if isinstance(st, StrategyBase):
return '%s.%s' % (type(st).__module__, type(st).__name__)
frame = frame.f_back
return ''
def run(self, tmp=None, task_vars=None):
return {
'changed': False,
'ansible_facts': {
'strategy': self._get_strategy_name(),
'is_mitogen': 'ansible_mitogen' in self._get_strategy_name(),
}
}

@ -0,0 +1,19 @@
#!/bin/bash
# I am an Ansible WANT_JSON module that returns the paths to its argv[0] and
# args file.
INPUT="$1"
[ ! -r "$INPUT" ] && {
echo "Usage: $0 <input_file.json>" >&2
exit 1
}
echo "{"
echo " \"changed\": false,"
echo " \"msg\": \"Here is my input\","
echo " \"input\": [$(< $INPUT)],"
echo " \"argv0\": \"$0\","
echo " \"argv1\": \"$1\""
echo "}"

@ -0,0 +1,16 @@
#!/bin/bash
# I am an Ansible old-style module.
INPUT=$1
[ ! -r "$INPUT" ] && {
echo "Usage: $0 <input_file>" >&2
exit 1
}
echo "{"
echo " \"changed\": false,"
echo " \"msg\": \"Here is my input\","
echo " \"filename\": \"$INPUT\","
echo " \"input\": [\"$(cat $INPUT | tr \" \' )\"]"
echo "}"

@ -0,0 +1,16 @@
#!/bin/bash
# I am an Ansible WANT_JSON module.
WANT_JSON=1
INPUT=$1
[ ! -r "$INPUT" ] && {
echo "Usage: $0 <input.json>" >&2
exit 1
}
echo "{"
echo " \"changed\": false,"
echo " \"msg\": \"Here is my input\","
echo " \"input\": [$(< $INPUT)]"
echo "}"

@ -0,0 +1,13 @@
#include <stdio.h>
int main(void)
{
fprintf(stderr, "binary_producing_json: oh noes\n");
printf("{"
"\"changed\": true, "
"\"failed\": false, "
"\"msg\": \"Hello, world.\""
"}\n");
return 0;
}

@ -0,0 +1,9 @@
#include <stdio.h>
int main(void)
{
fprintf(stderr, "binary_producing_junk: oh noes\n");
printf("Hello, world.\n");
return 0;
}

@ -0,0 +1,15 @@
#!/usr/bin/perl
binmode STDOUT, ":utf8";
use utf8;
use JSON;
my $json_args = <<'END_MESSAGE';
<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>
END_MESSAGE
print encode_json({
message => "I am a perl script! Here is my input.",
input => [decode_json($json_args)]
});

@ -0,0 +1,21 @@
#!/usr/bin/perl
binmode STDOUT, ":utf8";
use utf8;
my $WANT_JSON = 1;
use JSON;
my $json;
{
local $/; #Enable 'slurp' mode
open my $fh, "<", $ARGV[0];
$json_args = <$fh>;
close $fh;
}
print encode_json({
message => "I am a want JSON perl script! Here is my input.",
input => [decode_json($json_args)]
});

@ -0,0 +1,23 @@
#!/usr/bin/python
# I am an Ansible new-style Python module. I return details about the Python
# interpreter I run within.
from ansible.module_utils.basic import AnsibleModule
import os
import pwd
import socket
import sys
def main():
module = AnsibleModule(argument_spec={})
module.exit_json(
sys_executable=sys.executable,
mitogen_loaded='mitogen.core' in sys.modules,
hostname=socket.gethostname(),
username=pwd.getpwuid(os.getuid()).pw_name,
)
if __name__ == '__main__':
main()

@ -0,0 +1,13 @@
#!/usr/bin/python
# I am an Ansible Python JSONARGS module. I should receive an encoding string.
import json
import sys
json_arguments = """<<INCLUDE_ANSIBLE_MODULE_JSON_ARGS>>"""
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"input\": [%s]" % (json_arguments,)
print "}"

@ -0,0 +1,27 @@
#!/usr/bin/python
# I am an Ansible new-style Python module. I should receive an encoding string.
import json
import sys
# This is the magic marker Ansible looks for:
# from ansible.module_utils.
def usage():
sys.stderr.write('Usage: %s <input.json>\n' % (sys.argv[0],))
sys.exit(1)
# Also must slurp in our own source code, to verify the encoding string was
# added.
with open(sys.argv[0]) as fp:
me = fp.read()
input_json = sys.stdin.read()
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"source\": [%s]," % (json.dumps(me),)
print " \"input\": [%s]" % (input_json,)
print "}"

@ -0,0 +1,33 @@
#!/usr/bin/python
# I am an Ansible Python WANT_JSON module. I should receive an encoding string.
import json
import sys
WANT_JSON = 1
def usage():
sys.stderr.write('Usage: %s <input.json>\n' % (sys.argv[0],))
sys.exit(1)
if len(sys.argv) < 2:
usage()
# Also must slurp in our own source code, to verify the encoding string was
# added.
with open(sys.argv[0]) as fp:
me = fp.read()
try:
with open(sys.argv[1]) as fp:
input_json = fp.read()
except IOError:
usage()
print "{"
print " \"changed\": false,"
print " \"msg\": \"Here is my input\","
print " \"source\": [%s]," % (json.dumps(me),)
print " \"input\": [%s]" % (input_json,)
print "}"

@ -0,0 +1,11 @@
- import_playbook: issue_109.yml
- import_playbook: issue_113.yml
- import_playbook: issue_118.yml
- import_playbook: issue_122.yml
- import_playbook: issue_131.yml
- import_playbook: issue_140.yml
- import_playbook: issue_152.yml
- import_playbook: issue_152b.yml
- import_playbook: issue_154.yml
- import_playbook: issue_174.yml
- import_playbook: issue_177.yml

@ -1,8 +1,5 @@
---
# Reproduction for issue #109.
- hosts: all
roles:
- issue_109
gather_facts: no

@ -1,7 +1,4 @@
---
- hosts: all
gather_facts: false
tasks:
- name: Get auth token

@ -1,5 +1,3 @@
---
# issue #118 repro: chmod +x not happening during script upload
#
- name: saytrue

@ -1,4 +1,3 @@
- hosts: all
tasks:
- script: scripts/print_env.sh

@ -1,13 +1,9 @@
---
# Hopeful reproduction for issue #131.
# Run lots of steps (rather than just one) so WorkerProcess and suchlike
# machinery is constantly recreated.
- hosts: all
gather_facts: no
tasks:
- shell: "true"
- shell: "true"
- shell: "true"
@ -58,4 +54,3 @@
- shell: "true"
- shell: "true"
- shell: "true"

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save