ansible: enable forking when requested and for async jobs.

Closes #105.
References #155.

mitogen/service.py:
    Refactor services to support individually exposed methods with
    different security policies for each method.

    - @mitogen.service.expose() to expose a method and set its policy
    - @mitogen.service.arg_spec() to validate input.
    - Require basic service message format to be a tuple of
      `(method, kwargs)`, where kwargs is always a dict.
    - Update DeduplicatingService to match the new scheme.

ansible_mitogen/connection.py:
    - Rename 'method' to 'method_name' to disambiguate it from the
      service.call()'s method= argument.

ansible_mitogen/planner.py:
    - Generate an ID for every job, sync or not, and fetch job results
      from JobResultService rather than via the initiating function
      call's return value.
    - Planner subclasses now get to select whether their Runner should
      run in a forked process. The base implementation requests this if
      the 'mitogen_isolation_mode=fork' task variable is present.

ansible_mitogen/runner.py:
    Teach runners to deliver their result via JobResultService executing
    in their indirect parent mux process.

ansible_mitogen/plugins/actions/mitogen_async_status.py:
    Split the implementation up into methods, and more compatibly
    emulate Ansible's existing output.

ansible_mitogen/process.py:
    Mux processes now host JobResultService.

ansible_mitogen/services.py:
    Update existing services to the new mitogen.service scheme, and
    implement JobResultService:

    * listen() method for synchronous jobs. planner.invoke() registers a
      Sender with the service prior to invoking the job, then sleeps
      waiting for the service to write the job result to the
      corresponding Receiver.

    * Non-blocking get() method for implementing mitogen_async_status
      action.

    * Child-accessible push() method for delivering task results.

ansible_mitogen/target.py:
    New helpers for spawning a virginal subprocess on startup, from
    which asynchronous and mitogen_task_isolation=fork jobs are forked.
    Necessary to avoid a task inheriting potentially
    polluted/monkey-patched parent environment, since remaining jobs
    continue to run in the original child process.

docs/ansible.rst:
    Add/merge/remove some behaviours/risks.

tests/ansible/integration:
    New tests for forking/async.
pull/193/head
David Wilson 7 years ago
parent 71057c78f9
commit 3613162bc0

@ -137,11 +137,12 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def connected(self):
return self.broker is not None
def _wrap_connect(self, args):
def _wrap_connect(self, kwargs):
dct = mitogen.service.call(
context=self.parent,
handle=ContextService.handle,
obj=mitogen.utils.cast(args),
method='connect',
kwargs=mitogen.utils.cast(kwargs),
)
if dct['msg']:
@ -155,7 +156,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
master process.
"""
return self._wrap_connect({
'method': 'local',
'method_name': 'local',
'python_path': self.python_path,
})
@ -165,7 +166,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
ContextService in the master process.
"""
return self._wrap_connect({
'method': 'ssh',
'method_name': 'ssh',
'check_host_keys': False, # TODO
'hostname': self._play_context.remote_addr,
'discriminator': self.mitogen_ssh_discriminator,
@ -189,7 +190,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
def _connect_docker(self):
return self._wrap_connect({
'method': 'docker',
'method_name': 'docker',
'container': self._play_context.remote_addr,
'python_path': self.python_path,
'connect_timeout': self._play_context.timeout,
@ -205,7 +206,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
be a Context returned by _connect_ssh().
"""
return self._wrap_connect({
'method': 'sudo',
'method_name': 'sudo',
'username': self._play_context.become_user,
'password': self._play_context.become_pass,
'python_path': python_path or self.python_path,

@ -35,8 +35,10 @@ files/modules known missing.
"""
from __future__ import absolute_import
import json
import logging
import os
import random
from ansible.executor import module_common
import ansible.errors
@ -115,6 +117,8 @@ class Invocation(object):
self.env = env
#: Boolean, if :py:data:`True`, launch the module asynchronously.
self.wrap_async = wrap_async
#: String Job ID.
self.job_id = self._make_job_id()
#: Initially ``None``, but set by :func:`invoke`. The path on the
#: master to the module's implementation file.
@ -123,6 +127,9 @@ class Invocation(object):
#: binary contents of the module.
self.module_source = None
def _make_job_id(self):
return '%016x' % random.randint(0, 2**64)
def __repr__(self):
return 'Invocation(module_name=%s)' % (self.module_name,)
@ -140,7 +147,10 @@ class Planner(object):
"""
raise NotImplementedError()
def plan(self, invocation):
def get_should_fork(self, invocation):
return invocation.wrap_async
def plan(self, invocation, **kwargs):
"""
If :meth:`detect` returned :data:`True`, plan for the module's
execution, including granting access to or delivering any files to it
@ -155,7 +165,10 @@ class Planner(object):
# named by `runner_name`.
}
"""
raise NotImplementedError()
kwargs.setdefault('job_id', invocation.job_id)
kwargs.setdefault('service_context', invocation.connection.parent)
kwargs.setdefault('should_fork', self.get_should_fork(invocation))
return kwargs
class BinaryPlanner(Planner):
@ -168,22 +181,26 @@ class BinaryPlanner(Planner):
def detect(self, invocation):
return module_common._is_binary(invocation.module_source)
def plan(self, invocation):
def plan(self, invocation, **kwargs):
invocation.connection._connect()
mitogen.service.call(
invocation.connection.parent,
ansible_mitogen.services.FileService.handle,
('register', invocation.module_path)
)
return {
'runner_name': self.runner_name,
'module': invocation.module_name,
'service_context': invocation.connection.parent,
'path': invocation.module_path,
'args': invocation.module_args,
'env': invocation.env,
'remote_tmp': invocation.remote_tmp,
context=invocation.connection.parent,
handle=ansible_mitogen.services.FileService.handle,
method='register',
kwargs={
'path': invocation.module_path
}
)
return super(BinaryPlanner, self).plan(
invocation=invocation,
runner_name=self.runner_name,
module=invocation.module_name,
path=invocation.module_path,
args=invocation.module_args,
env=invocation.env,
remote_tmp=invocation.remote_tmp,
**kwargs
)
class ScriptPlanner(BinaryPlanner):
@ -199,20 +216,21 @@ class ScriptPlanner(BinaryPlanner):
except KeyError:
return interpreter
def plan(self, invocation):
kwargs = super(ScriptPlanner, self).plan(invocation)
def plan(self, invocation, **kwargs):
interpreter, arg = parse_script_interpreter(invocation.module_source)
if interpreter is None:
raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % (
invocation.module_name,
))
return dict(kwargs,
return super(ScriptPlanner, self).plan(
invocation=invocation,
interpreter_arg=arg,
interpreter=self._rewrite_interpreter(
interpreter=interpreter,
invocation=invocation
)
),
**kwargs
)
@ -283,6 +301,12 @@ class NewStylePlanner(ScriptPlanner):
"""
runner_name = 'NewStyleRunner'
def get_should_fork(self, invocation):
return (
super(NewStylePlanner, self).get_should_fork(invocation) or
(invocation.task_vars.get('mitogen_task_isolation') == 'fork')
)
def detect(self, invocation):
return 'from ansible.module_utils.' in invocation.module_source
@ -319,10 +343,7 @@ def get_module_data(name):
return path, source
def invoke(invocation):
"""
Find a suitable Planner that knows how to run `invocation`.
"""
def _do_invoke(invocation):
(invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name)
@ -333,17 +354,50 @@ def invoke(invocation):
else:
raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))
kwargs = planner.plan(invocation)
if invocation.wrap_async:
helper = ansible_mitogen.target.run_module_async
else:
helper = ansible_mitogen.target.run_module
try:
js = invocation.connection.call(helper, kwargs)
kwargs = planner.plan(invocation)
invocation.connection.call(ansible_mitogen.target.run_module, kwargs)
except mitogen.core.CallError as e:
LOG.exception('invocation crashed: %r', invocation)
summary = str(e).splitlines()[0]
raise ansible.errors.AnsibleInternalError(CRASHED_MSG + summary)
def _invoke_async(invocation):
_do_invoke(invocation)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': invocation.job_id,
})
}
def _invoke_sync(invocation):
recv = mitogen.core.Receiver(invocation.connection.router)
mitogen.service.call_async(
context=invocation.connection.parent,
handle=ansible_mitogen.services.JobResultService.handle,
method='listen',
kwargs={
'job_id': invocation.job_id,
'sender': recv.to_sender(),
}
)
_do_invoke(invocation)
return recv.get().unpickle()
def invoke(invocation):
"""
Find a suitable Planner that knows how to run `invocation`.
"""
if invocation.wrap_async:
js = _invoke_async(invocation)
else:
js = _invoke_sync(invocation)
return invocation.action._postprocess_response(js)

@ -28,29 +28,26 @@
import ansible.plugins.action
import mitogen.core
import mitogen.utils
import ansible_mitogen.services
import ansible_mitogen.target
from mitogen.utils import cast
class ActionModule(ansible.plugins.action.ActionBase):
def run(self, tmp=None, task_vars=None):
job_id = self._task.args['jid']
try:
result = self._connection.call(
ansible_mitogen.target.get_async_result,
cast(job_id),
)
except mitogen.core.CallError, e:
return {
'ansible_job_id': job_id,
'started': 1,
'failed': 1,
'finished': 1,
'msg': str(e),
def _get_async_result(self, job_id):
self._connection._connect()
return mitogen.service.call(
context=self._connection.parent,
handle=ansible_mitogen.services.JobResultService.handle,
method='get',
kwargs={
'job_id': job_id,
}
)
if result is None:
def _on_result_pending(self, job_id):
return {
'_ansible_parsed': True,
'ansible_job_id': job_id,
'started': 1,
'failed': 0,
@ -58,8 +55,24 @@ class ActionModule(ansible.plugins.action.ActionBase):
'msg': '',
}
dct = self._parse_returned_data({'stdout': result})
def _on_result_available(self, job_id, result):
dct = self._parse_returned_data(result)
dct['ansible_job_id'] = job_id
dct['started'] = 1
dct['finished'] = 1
# Cutpasted from the action.py.
if 'stdout' in dct and 'stdout_lines' not in dct:
dct['stdout_lines'] = (dct['stdout'] or u'').splitlines()
if 'stderr' in dct and 'stderr_lines' not in dct:
dct['stderr_lines'] = (dct['stderr'] or u'').splitlines()
return dct
def run(self, tmp=None, task_vars=None):
job_id = mitogen.utils.cast(self._task.args['jid'])
result = self._get_async_result(job_id)
if result is None:
return self._on_result_pending(job_id)
else:
return self._on_result_available(job_id, result)

@ -155,6 +155,7 @@ class MuxProcess(object):
services=[
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.FileService(self.router),
ansible_mitogen.services.JobResultService(self.router),
],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),
)

@ -45,6 +45,7 @@ import sys
import tempfile
import types
import mitogen.service
import ansible_mitogen.target # TODO: circular import
try:
@ -82,14 +83,17 @@ class Runner(object):
Subclasses may override `_run`()` and extend `setup()` and `revert()`.
"""
def __init__(self, module, remote_tmp, raw_params=None, args=None, env=None):
def __init__(self, module, job_id, remote_tmp, service_context,
raw_params=None, args=None, env=None):
if args is None:
args = {}
if raw_params is not None:
args['_raw_params'] = raw_params
self.module = module
self.job_id = job_id
self.remote_tmp = os.path.expanduser(remote_tmp)
self.service_context = service_context
self.raw_params = raw_params
self.args = args
self.env = env
@ -131,6 +135,17 @@ class Runner(object):
"""
raise NotImplementedError()
def _send_result(self, dct):
mitogen.service.call(
context=self.service_context,
handle=502,
method='push',
kwargs={
'job_id': self.job_id,
'result': dct
}
)
def run(self):
"""
Set up the process environment in preparation for running an Ansible
@ -143,7 +158,7 @@ class Runner(object):
"""
self.setup()
try:
return self._run()
self._send_result(self._run())
finally:
self.revert()
@ -193,10 +208,9 @@ class NewStyleStdio(object):
class ProgramRunner(Runner):
def __init__(self, path, service_context, **kwargs):
def __init__(self, path, **kwargs):
super(ProgramRunner, self).__init__(**kwargs)
self.path = path
self.service_context = service_context
def setup(self):
super(ProgramRunner, self).setup()

@ -29,15 +29,21 @@
from __future__ import absolute_import
import logging
import os.path
import threading
import zlib
import mitogen
import mitogen.service
import ansible_mitogen.target
LOG = logging.getLogger(__name__)
class Error(Exception):
pass
class ContextService(mitogen.service.DeduplicatingService):
"""
Used by worker processes connecting back into the top-level process to
@ -74,15 +80,17 @@ class ContextService(mitogen.service.DeduplicatingService):
"""
handle = 500
max_message_size = 1000
required_args = {
'method': str
}
def get_response(self, args):
args.pop('discriminator', None)
method = getattr(self.router, args.pop('method'))
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'method_name': str
})
def connect(self, method_name, discriminator=None, **kwargs):
method = getattr(self.router, method_name, None)
if method is None:
raise Error('no such Router method: %s' % (method_name,))
try:
context = method(**args)
context = method(**kwargs)
except mitogen.core.StreamError as e:
return {
'context': None,
@ -91,6 +99,10 @@ class ContextService(mitogen.service.DeduplicatingService):
}
home_dir = context.call(os.path.expanduser, '~')
# We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch any failures anyway.
context.call_async(ansible_mitogen.target.start_fork_parent)
return {
'context': context,
'home_dir': home_dir,
@ -135,32 +147,77 @@ class FileService(mitogen.service.Service):
super(FileService, self).__init__(router)
self._paths = {}
def validate_args(self, args):
return (
isinstance(args, tuple) and
len(args) == 2 and
args[0] in ('register', 'fetch') and
isinstance(args[1], basestring)
)
def dispatch(self, args, msg):
cmd, path = args
return getattr(self, cmd)(path, msg)
def register(self, path, msg):
if not mitogen.core.has_parent_authority(msg):
raise mitogen.core.CallError(self.unprivileged_msg)
if path in self._paths:
return
@mitogen.service.expose(policy=mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'path': basestring
})
def register(self, path):
if path not in self._paths:
LOG.info('%r: registering %r', self, path)
with open(path, 'rb') as fp:
self._paths[path] = zlib.compress(fp.read())
def fetch(self, path, msg):
@mitogen.service.expose(policy=mitogen.service.AllowAny())
@mitogen.service.arg_spec({
'path': basestring
})
def fetch(self, path):
if path not in self._paths:
raise mitogen.core.CallError(self.unregistered_msg)
LOG.debug('Serving %r to context %r', path, msg.src_id)
LOG.debug('Serving %r', path)
return self._paths[path]
class JobResultService(mitogen.service.Service):
"""
Receive the result of a task from a child and forward it to interested
listeners. If no listener exists, store the result until it is requested.
Results are keyed by job ID.
"""
handle = 502
max_message_size = 1048576 * 64
def __init__(self, router):
super(JobResultService, self).__init__(router)
self._lock = threading.Lock()
self._result_by_job_id = {}
self._sender_by_job_id = {}
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'job_id': str,
'sender': mitogen.core.Sender,
})
def listen(self, job_id, sender):
LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender)
with self._lock:
if job_id in self._sender_by_job_id:
raise Error('Listener already exists for job: %s' % (job_id,))
self._sender_by_job_id[job_id] = sender
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'job_id': basestring,
})
def get(self, job_id):
LOG.debug('%r.get(job_id=%r)', self, job_id)
with self._lock:
return self._result_by_job_id.pop(job_id, None)
@mitogen.service.expose(mitogen.service.AllowAny())
@mitogen.service.arg_spec({
'job_id': basestring,
'result': dict
})
def push(self, job_id, result):
LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result)
with self._lock:
if job_id in self._result_by_job_id:
raise Error('Result already exists for job: %s' % (job_id,))
sender = self._sender_by_job_id.pop(job_id, None)
if sender:
sender.send(result)
else:
self._result_by_job_id[job_id] = result

@ -55,6 +55,8 @@ def wrap_action_loader__get(name, *args, **kwargs):
This is used instead of static subclassing as it generalizes to third party
action modules outside the Ansible tree.
"""
# Necessary since async_status execution strategy is hard-wired in
# executor/task_executor.py::_poll_async_result().
if ( name == 'normal' and 'task' in kwargs and
kwargs['task'].action == 'async_status'):
name = 'mitogen_async_status'

@ -26,6 +26,11 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Helper functions intended to be executed on the target. These are entrypoints
for file transfer, module execution and sundry bits like changing file modes.
"""
from __future__ import absolute_import
import json
import logging
@ -40,10 +45,12 @@ import tempfile
import threading
import zlib
import mitogen.core
import mitogen.service
import ansible_mitogen.runner
import ansible_mitogen.services
import mitogen.core
import mitogen.fork
import mitogen.parent
import mitogen.service
LOG = logging.getLogger(__name__)
@ -51,17 +58,15 @@ LOG = logging.getLogger(__name__)
#: Caching of fetched file data.
_file_cache = {}
#: Mapping of job_id<->result dict
_result_by_job_id = {}
#: Mapping of job_id<->threading.Thread
_thread_by_job_id = {}
#: Initialized to an econtext.parent.Context pointing at a pristine fork of
#: the target Python interpreter before it executes any code or imports.
_fork_parent = None
def get_file(context, path):
"""
Basic in-memory caching module fetcher. This generates an one roundtrip for
every previously unseen module, so it is only temporary.
every previously unseen file, so it is only a temporary solution.
:param context:
Context we should direct FileService requests to. For now (and probably
@ -75,39 +80,64 @@ def get_file(context, path):
if path not in _file_cache:
_file_cache[path] = zlib.decompress(
mitogen.service.call(
context,
ansible_mitogen.services.FileService.handle,
('fetch', path)
context=context,
handle=ansible_mitogen.services.FileService.handle,
method='fetch',
kwargs={
'path': path
}
)
)
return _file_cache[path]
def run_module(kwargs):
@mitogen.core.takes_econtext
def start_fork_parent(econtext):
"""
Called by ContextService immediately after connection; arranges for the
(presently) spotless Python interpreter to be forked, where the newly
forked interpreter becomes the parent of any newly forked future
interpreters.
This is necessary to prevent modules that are executed in-process from
polluting the global interpreter state in a way that effects explicitly
isolated modules.
"""
mitogen.parent.upgrade_router(econtext)
global _fork_parent
_fork_parent = econtext.router.fork()
@mitogen.core.takes_econtext
def start_fork_child(kwargs, econtext):
mitogen.parent.upgrade_router(econtext)
context = econtext.router.fork()
kwargs['shutdown_on_exit'] = True
context.call_async(run_module, kwargs)
@mitogen.core.takes_econtext
def run_module(kwargs, econtext):
"""
Set up the process environment in preparation for running an Ansible
module. This monkey-patches the Ansible libraries in various places to
prevent it from trying to kill the process on completion, and to prevent it
from reading sys.stdin.
"""
should_fork = kwargs.pop('should_fork', False)
shutdown_on_exit = kwargs.pop('shutdown_on_exit', False)
if should_fork:
_fork_parent.call(start_fork_child, kwargs)
return
runner_name = kwargs.pop('runner_name')
klass = getattr(ansible_mitogen.runner, runner_name)
impl = klass(**kwargs)
return impl.run()
def _async_main(job_id, runner_name, kwargs):
"""
Implementation for the thread that implements asynchronous module
execution.
"""
try:
rc = run_module(runner_name, kwargs)
except Exception, e:
rc = mitogen.core.CallError(e)
_result_by_job_id[job_id] = rc
impl.run()
if shutdown_on_exit:
econtext.broker.shutdown()
def make_temp_directory(base_dir):
@ -125,42 +155,6 @@ def make_temp_directory(base_dir):
prefix='ansible-mitogen-tmp-',
)
def run_module_async(runner_name, kwargs):
"""
Arrange for an Ansible module to be executed in a thread of the current
process, with results available via :py:func:`get_async_result`.
"""
job_id = '%08x' % random.randint(0, 2**32-1)
_result_by_job_id[job_id] = None
_thread_by_job_id[job_id] = threading.Thread(
target=_async_main,
kwargs={
'job_id': job_id,
'runner_name': runner_name,
'kwargs': kwargs,
}
)
_thread_by_job_id[job_id].start()
return json.dumps({
'ansible_job_id': job_id,
'changed': True
})
def get_async_result(job_id):
"""
Poll for the result of an asynchronous task.
:param str job_id:
Job ID to poll for.
:returns:
``None`` if job is still running, JSON-encoded result dictionary if
execution completed normally, or :py:class:`mitogen.core.CallError` if
an exception was thrown.
"""
if not _thread_by_job_id[job_id].isAlive():
return _result_by_job_id[job_id]
def get_user_shell():
"""

@ -128,25 +128,20 @@ This is a proof of concept: issues below are exclusively due to code immaturity.
High Risk
~~~~~~~~~
* Transfer of large (i.e. GB-sized) files using certain Ansible-internal APIs,
such as triggered via the ``copy`` module, will cause corresponding temporary
memory and CPU spikes on both host and target machine, due to delivering the
file as a single large message. If many machines are targetted with a large
file, the host machine could easily exhaust available RAM. This will be fixed
soon as it's likely to be tickled by common playbooks.
* Local actions are single threaded. Any that execute for every target will
experience artificial serialization, causing slowdown equivalent to
`task_duration * num_targets`. This will be fixed soon.
* Transfer of large files using certain Ansible-internal APIs, such as
triggered via the ``copy`` module, will cause corresponding memory and CPU
spikes on both host and target machine, due to delivering the file as a
single message. If many machines are targetted, the controller could easily
exhaust available RAM. This will be fixed soon as it's likely to be tickled
by common playbooks.
* `Asynchronous Actions And Polling
<https://docs.ansible.com/ansible/latest/playbooks_async.html>`_ has received
minimal testing. Jobs execute in a thread of the target Python interpreter.
This will fixed shortly.
minimal testing.
* No mechanism exists yet to bound the number of interpreters created during a
run. For some playbooks that parameterize ``become_user`` over a large number
of user accounts, resource exhaustion may be triggered on the target machine.
* No mechanism exists to bound the number of interpreters created during a run.
For some playbooks that parameterize ``become_user`` over many accounts,
resource exhaustion may be triggered on the target machine.
* Only Ansible 2.4 is being used for development, with occasional tests under
2.5, 2.3 and 2.2. It should be more than possible to fully support at least
@ -189,18 +184,20 @@ Behavioural Differences
following its parent SSH account, and try to emulate Ansible's existing
timeout semantics.
* Normally with Ansible, diagnostics and use of the :py:mod:`logging` package
output on the target machine are discarded. With Mitogen, all of this is
captured and returned to the host machine, where it can be viewed as desired
with ``-vvv``.
* Local commands are executed in a reuseable Python interpreter created
identically to interpreters used on remote hosts. At present only one such
interpreter per ``become_user`` exists, and so only one action may be
executed in each context simultaneously. Ansible usually permits up to
``ansible.cfg:forks`` simultaneous local actions, which may trigger a
performance regression in some playbooks. This will be fixed in a future
release.
interpreter per ``become_user`` exists, and so only one local action may be
executed simultaneously per local user account.
Ansible usually permits up to ``ansible.cfg:forks`` simultaneous local
actions. Any long-running local actions that execute for every target will
experience artificial serialization, causing slowdown equivalent to
`task_duration * num_targets`. This will be fixed soon.
* Asynchronous job IDs exist only for the duration of a run, and cannot be
queried by subsequent ansible-playbook invocations. Since the ability to
query job IDs across runs relied on an implementation detail, it is not
expected this will break any real-world playbooks.
How Modules Execute
@ -340,16 +337,18 @@ FreeNode IRC network.
Debugging
---------
Mitogen's logs are integrated into Ansible's display framework. Basic high
level debug logs are produced with ``-vvv``, with logging of all IO activity on
the controller machine when ``-vvvv`` or higher is specified.
Although any use of standard IO and the logging package on remote machines is
forwarded to the controller machine, it is not possible to receive logs of all
IO activity, as the processs of receiving those logs would would in turn
generate more IO activity. To receive a complete trace of every process on
every machine, file-based logging is required. File-based logging can be
enabled by setting ``MITOGEN_ROUTER_DEBUG=1`` in your environment.
Normally with Ansible, diagnostics and use of the :py:mod:`logging` package
output on the target machine are discarded. With Mitogen, all of this is
captured and returned to the host machine, where it can be viewed as desired
with ``-vvv``. Basic high level logs are produced with ``-vvv``, with logging
of all IO on the controller with ``-vvvv`` or higher.
Although use of standard IO and the logging package on the target is forwarded
to the controller, it is not possible to receive IO activity logs, as the
processs of receiving those logs would would itself generate IO activity. To
receive a complete trace of every process on every machine, file-based logging
is necessary. File-based logging can be enabled by setting
``MITOGEN_ROUTER_DEBUG=1`` in your environment.
When file-based logging is enabled, one file per context will be created on the
local machine and every target machine, as ``/tmp/mitogen.<pid>.log``.

@ -7,8 +7,8 @@ Service Framework
Mitogen includes a simple framework for implementing services exposed to other
contexts, with built-in subclasses that capture some common service models.
This is a work in progress, and new functionality will be added as a common use
for it is is found.
This is a work in progress, and new functionality will be added as common usage
patterns emerge.
Overview
@ -72,6 +72,13 @@ Example
Reference
---------
.. autoclass:: mitogen.service.Policy
.. autoclass:: mitogen.service.AllowParents
.. autoclass:: mitogen.service.AllowAny
.. autofunction:: mitogen.service.arg_spec
.. autofunction:: mitogen.service.expose
.. autofunction:: mitogen.service.Service
.. autoclass:: mitogen.service.Service

@ -54,6 +54,72 @@ class AllowParents(Policy):
msg.auth_id == mitogen.context_id)
def validate_arg_spec(spec, args):
for name in spec:
try:
obj = args[name]
except KeyError:
raise mitogen.core.CallError(
'Required argument %r missing.' % (name,)
)
if not isinstance(obj, spec[name]):
raise mitogen.core.CallError(
'Argument %r type incorrect, got %r, expected %r' % (
name,
type(obj),
spec[name]
)
)
def arg_spec(spec):
"""
Annotate a method as requiring arguments with a specific type. This only
validates required arguments. For optional arguments, write a manual check
within the function.
::
@mitogen.service.arg_spec({
'path': str
})
def fetch_path(self, path, optional=None):
...
:param dict spec:
Mapping from argument name to expected type.
"""
def wrapper(func):
func.mitogen_service__arg_spec = spec
return func
return wrapper
def expose(policy):
"""
Annotate a method to permit access to contexts matching an authorization
policy. The annotation may be specified multiple times. Methods lacking any
authorization policy are not accessible.
::
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def unsafe_operation(self):
...
:param mitogen.service.Policy policy:
The policy to require.
"""
def wrapper(func):
func.mitogen_service__policies = (
[policy] +
getattr(func, 'mitogen_service__policies', [])
)
return func
return wrapper
class Service(object):
#: Sentinel object to suppress reply generation, since returning ``None``
#: will trigger a response message containing the pickled ``None``.
@ -64,17 +130,6 @@ class Service(object):
handle = None
max_message_size = 0
#: Mapping from required key names to their required corresponding types,
#: used by the default :py:meth:`validate_args` implementation to validate
#: requests.
required_args = {}
#: Policies that must authorize each message. By default only parents are
#: authorized.
policies = (
AllowParents(),
)
def __init__(self, router):
self.router = router
self.recv = mitogen.core.Receiver(router, self.handle)
@ -88,58 +143,51 @@ class Service(object):
self.__class__.__name__,
)
def validate_args(self, args):
return (
isinstance(args, dict) and
all(isinstance(args.get(k), t)
for k, t in self.required_args.iteritems())
)
def dispatch(self, args, msg):
raise NotImplementedError()
def dispatch_one(self, msg):
if not all(p.is_authorized(self, msg) for p in self.policies):
LOG.error('%r: unauthorized message %r', self, msg)
msg.reply(mitogen.core.CallError('Unauthorized'))
return
def _validate_message(self, msg):
if len(msg.data) > self.max_message_size:
LOG.error('%r: larger than permitted size: %r', self, msg)
msg.reply(mitogen.core.CallError('Message size exceeded'))
return
raise mitogen.core.CallError('Message size exceeded.')
args = msg.unpickle(throw=False)
if (args == mitogen.core._DEAD or
isinstance(args, mitogen.core.CallError) or
not self.validate_args(args)):
LOG.warning('Received junk message: %r', args)
msg.reply(mitogen.core.CallError('Received junk message'))
return
pair = msg.unpickle(throw=False)
if not (isinstance(pair, tuple) and
len(pair) == 2 and
isinstance(pair[0], basestring)):
raise mitogen.core.CallError('Invalid message format.')
method_name, kwargs = pair
method = getattr(self, method_name, None)
if method is None:
raise mitogen.core.CallError('No such method exists.')
policies = getattr(method, 'mitogen_service__policies', None)
if not policies:
raise mitogen.core.CallError('Method has no policies set.')
if not all(p.is_authorized(self, msg) for p in policies):
raise mitogen.core.CallError('Unauthorized')
required = getattr(method, 'mitogen_service__arg_spec', {})
validate_arg_spec(required, kwargs)
return method_name, kwargs
def _on_receive_message(self, msg):
method_name, kwargs = self._validate_message(msg)
return getattr(self, method_name)(**kwargs)
def on_receive_message(self, msg):
try:
response = self.dispatch(args, msg)
response = self._on_receive_message(msg)
if response is not self.NO_REPLY:
msg.reply(response)
except mitogen.core.CallError, e:
LOG.warning('%r: %s', self, msg)
msg.reply(e)
except Exception, e:
LOG.exception('While invoking %r.dispatch()', self)
msg.reply(mitogen.core.CallError(e))
def run_once(self):
try:
msg = self.recv.get()
except mitogen.core.ChannelError, e:
# Channel closed due to broker shutdown, exit gracefully.
LOG.debug('%r: channel closed: %s', self, e)
self.running = False
return
self.dispatch_one(msg)
def run(self):
while self.running:
self.run_once()
class DeduplicatingService(Service):
"""
@ -159,13 +207,13 @@ class DeduplicatingService(Service):
self._waiters = {}
self._lock = threading.Lock()
def key_from_request(self, args):
def key_from_request(self, method_name, kwargs):
"""
Generate a deduplication key from the request. The default
implementation returns a string based on a stable representation of the
input dictionary generated by :py:func:`pprint.pformat`.
"""
return pprint.pformat(args)
return pprint.pformat((method_name, kwargs))
def get_response(self, args):
raise NotImplementedError()
@ -181,8 +229,9 @@ class DeduplicatingService(Service):
finally:
self._lock.release()
def dispatch(self, args, msg):
key = self.key_from_request(args)
def _on_receive_message(self, msg):
method_name, kwargs = self._validate_message(msg)
key = self.key_from_request(method_name, kwargs)
self._lock.acquire()
try:
@ -199,7 +248,10 @@ class DeduplicatingService(Service):
# I'm the unlucky thread that must generate the response.
try:
self._produce_response(key, self.get_response(args))
response = getattr(self, method_name)(**kwargs)
self._produce_response(key, response)
except mitogen.core.CallError, e:
self._produce_response(key, e)
except Exception, e:
self._produce_response(key, mitogen.core.CallError(e))
@ -260,7 +312,7 @@ class Pool(object):
service = msg.receiver.service
try:
service.dispatch_one(msg)
service.on_receive_message(msg)
except Exception:
LOG.exception('While handling %r using %r', msg, service)
@ -281,7 +333,12 @@ class Pool(object):
)
def call(context, handle, obj):
msg = mitogen.core.Message.pickled(obj, handle=handle)
recv = context.send_async(msg)
def call_async(context, handle, method, kwargs):
pair = (method, kwargs)
msg = mitogen.core.Message.pickled(pair, handle=handle)
return context.send_async(msg)
def call(context, handle, method, kwargs):
recv = call_async(context, handle, method, kwargs)
return recv.get().unpickle()

@ -1,14 +0,0 @@
- hosts: all
any_errors_fatal: true
tasks:
- name: simulate long running op (3 sec), wait for up to 5 sec, poll every 1 sec
command: /bin/sleep 2
async: 4
register: derp
- debug: msg={{derp}}
- async_status: jid={{derp.ansible_job_id}}
register: derp2
- debug: msg={{derp2}}

@ -1,3 +1,6 @@
- import_playbook: async_job_timeout.yml
- import_playbook: async_one_job.yml
- import_playbook: async_two_simultaneous_jobs.yml
- import_playbook: builtin_command_module.yml
- import_playbook: custom_bash_old_style_module.yml
- import_playbook: custom_bash_want_json_module.yml
@ -9,4 +12,5 @@
- import_playbook: custom_python_json_args_module.yml
- import_playbook: custom_python_new_style_module.yml
- import_playbook: custom_python_want_json_module.yml
- import_playbook: forking_behaviour.yml
- import_playbook: remote_tmp.yml

@ -0,0 +1,52 @@
# Verify 'async: <timeout>' functions as desired.
- hosts: all
any_errors_fatal: true
tasks:
# Verify async-with-polling-and-timeout behaviour.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 1
register: job1
- assert:
that:
- job1.changed == False
- job1.failed == True
- job1.msg == "async task did not complete within the requested time"
- job1.keys()|sort == ['changed', 'failed', 'msg']
# Verify async-with-timeout-then-poll behaviour.
# This is broken in upstream Ansible, so disable the tests there.
#
# TODO: the tests below are totally broken, not clear what Ansible is
# supposed to do here, so can't emulate it in Mitogen.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 0
register: job2
when: false # is_mitogen
- name: poll up to 10 times.
async_status:
jid: "{{job2.ansible_job_id}}"
register: result2
until: result2.finished
retries: 10
delay: 1
when: false # is_mitogen
- assert:
that:
- result1.rc == 0
- result2.rc == 0
- result2.stdout == 'im_alive'
when: false # is_mitogen

@ -0,0 +1,99 @@
# Verify behaviour of a single asynchronous task, and presence of all output
# fields.
- hosts: all
any_errors_fatal: true
tasks:
# Verify async jobs run in a new process.
- name: get process ID.
custom_python_detect_environment:
register: sync_proc1
- name: get process ID again.
custom_python_detect_environment:
register: sync_proc2
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
when: is_mitogen
- name: get async process ID.
custom_python_detect_environment:
register: async_proc1
async: 1000
poll: 0
- name: busy-poll up to 100000 times
async_status:
jid: "{{async_proc1.ansible_job_id}}"
register: async_result1
until: async_result1.finished
retries: 100000
delay: 0
- name: get async process ID again.
custom_python_detect_environment:
register: async_proc2
async: 1000
poll: 0
- name: busy-poll up to 100000 times
async_status:
jid: "{{async_proc2.ansible_job_id}}"
register: async_result2
until: async_result2.finished
retries: 100000
delay: 0
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
- async_result1.pid != sync_proc1.pid
- async_result1.pid != async_result2.pid
when: is_mitogen
# Verify output of a single async job.
- name: start 2 second op
shell: |
sleep 1;
echo alldone
async: 1000
poll: 0
register: job1
- assert:
that: |
job1.ansible_job_id and
(job1.changed == True) and
(job1.started == 1) and
(job1.changed == True) and
(job1.finished == 0)
- name: busy-poll up to 100000 times
async_status:
jid: "{{job1.ansible_job_id}}"
register: result1
until: result1.finished
retries: 100000
delay: 0
- assert:
that:
- result1.ansible_job_id == job1.ansible_job_id
- result1.attempts <= 100000
- result1.changed == True
- result1.cmd == "sleep 1;\n echo alldone"
- result1.delta|length == 14
- result1.start|length == 26
- result1.failed == False
- result1.finished == 1
- result1.rc == 0
- result1.start|length == 26
- result1.stderr == ""
- result1.stderr_lines == []
- result1.stdout == "alldone"
- result1.stdout_lines == ["alldone"]

@ -0,0 +1,54 @@
- hosts: all
any_errors_fatal: true
tasks:
# Start 2 duplicate jobs, verify they run concurrently.
- name: create semaphore file and sleep for 5 seconds.
shell: |
exec 2>/dev/null;
bash -c '
echo im_alive $$ > /tmp/flurp
sleep 60;
';
rm -f /tmp/flurp;
echo alldone
async: 1000
poll: 0
register: job1
# This guy prints the first field from the semaphore file and kills the PID
# from the second field, cancelling the slow sleep above, so the busy-poll
# below compltes quickly.
- name: verify semaphore file exists while this job exists.
shell: |
[ -f /tmp/flurp ] && {
read im_alive pid < /tmp/flurp
echo $im_alive
kill $pid &>/dev/null
}
async: 1000
poll: 0
register: job2
- name: (job1) busy-poll up to 100000 times
async_status:
jid: "{{job1.ansible_job_id}}"
register: result1
until: result1.finished
retries: 100000
delay: 0
- name: (job2) busy-poll up to 100000 times
async_status:
jid: "{{job2.ansible_job_id}}"
register: result2
until: result2.finished
retries: 100000
delay: 0
- assert:
that:
- result1.rc == 0
- result2.rc == 0
- result2.stdout == 'im_alive'

@ -0,0 +1,45 @@
- hosts: all
any_errors_fatal: true
tasks:
# Verify non-async jobs run in-process.
- debug: msg={{is_mitogen}}
- name: get process ID.
custom_python_detect_environment:
register: sync_proc1
when: is_mitogen
- name: get process ID again.
custom_python_detect_environment:
register: sync_proc2
when: is_mitogen
- assert:
that:
- sync_proc1.pid == sync_proc2.pid
when: is_mitogen
# Verify mitogen_task_isolation=fork triggers forking.
- name: get force-forked process ID.
custom_python_detect_environment:
register: fork_proc1
vars:
mitogen_task_isolation: fork
when: is_mitogen
- name: get force-forked process ID again.
custom_python_detect_environment:
register: fork_proc2
vars:
mitogen_task_isolation: fork
when: is_mitogen
- assert:
that:
- fork_proc1.pid != sync_proc1.pid
- fork_proc1.pid != fork_proc2.pid
when: is_mitogen

@ -13,6 +13,10 @@ import sys
def main():
module = AnsibleModule(argument_spec={})
module.exit_json(
pid=os.getpid(),
ppid=os.getppid(),
uid=os.getuid(),
euid=os.geteuid(),
sys_executable=sys.executable,
mitogen_loaded='mitogen.core' in sys.modules,
hostname=socket.gethostname(),

@ -4,4 +4,11 @@
# Used by delegate_to.yml to ensure "sudo -E" preserves environment.
export I_WAS_PRESERVED=1
exec ansible-playbook "$@"
if [ "${ANSIBLE_STRATEGY:0:7}" = "mitogen" ]
then
extra="-e is_mitogen=1"
else
extra="-e is_mitogen=0"
fi
exec ansible-playbook $extra "$@"

Loading…
Cancel
Save