issue #186: rework async/forked tasks again.

The controller must know the ID of the forked child in order to
propagate dependencies to it, so forking+starting the module run cannot
happen entirely on the target, without some additional mechanism to
wait-and-repropagate the deps as they arrive on the target.

Rework things so that init_child() also handles starting the fork parent,
and returns it along with the context's home directory in a single round
trip.

Now master knows the identity of the fork parent, it can directly create
fork children and call run_module_async() in them. This necessitates 2
roundtrips to start an asynchronous task.

This whole thing sucks and entirely needs simplified, but for now things
almost work, so keeping it.

connection.py:
  * Expect ContextService to return the entire dict return value of
    init_child(). Store the fork_contxt from the return value.

planner.py:
  * Rework Planner to store the invocation as an instance attribute, to
    simplify method calls.
  * Add Planner.get_push_files() and Planner.get_module_deps().
  * Add _propagate_deps() which takes a Planner and ensures the deps it
    describes are sent to a (non forked or forked) context.
  * Move async task logic out of target.py and into invoke() /
    _invoke_*().

process.py:
  * Services no longer need references to each other. planner.py handles
    sending module deps with one extra RPC.

services.py:
  * Return "init_child_result" key instead of simple "home_dir" key.
  * Get rid of dep propagation from ModuleDepService, it lives in
    planner.py now.

target.py:
  * Get rid of async task start logic, lives in planner.py now.
pull/262/head
David Wilson 7 years ago
parent 526590027a
commit caffaa79f7

@ -298,13 +298,17 @@ class Connection(ansible.plugins.connection.ConnectionBase):
router = None
#: mitogen.master.Context representing the parent Context, which is
#: presently always the master process.
#: presently always the connection multiplexer process.
parent = None
#: mitogen.master.Context connected to the target user account on the
#: target machine (i.e. via sudo).
context = None
#: mitogen.master.Context connected to the fork parent process in the
#: target user account.
fork_context = None
#: Only sudo and su are supported for now.
become_methods = ['sudo', 'su']
@ -336,7 +340,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
host_vars = None
#: Set after connection to the target context's home directory.
_homedir = None
home_dir = None
def __init__(self, play_context, new_stdin, **kwargs):
assert ansible_mitogen.process.MuxProcess.unix_listener_path, (
@ -376,7 +380,7 @@ class Connection(ansible.plugins.connection.ConnectionBase):
@property
def homedir(self):
self._connect()
return self._homedir
return self.home_dir
@property
def connected(self):
@ -470,15 +474,8 @@ class Connection(ansible.plugins.connection.ConnectionBase):
raise ansible.errors.AnsibleConnectionFailure(dct['msg'])
self.context = dct['context']
self._homedir = dct['home_dir']
def get_context_name(self):
"""
Return the name of the target context we issue commands against, i.e. a
unique string useful as a key for related data, such as a list of
modules uploaded to the target.
"""
return self.context.name
self.fork_context = dct['init_child_result']['fork_context']
self.home_dir = dct['init_child_result']['home_dir']
def close(self, new_task=False):
"""
@ -526,6 +523,17 @@ class Connection(ansible.plugins.connection.ConnectionBase):
LOG.debug('Call took %d ms: %s%r', 1000 * (time.time() - t0),
func.func_name, args)
def create_fork_child(self):
"""
Fork a new child off the target context. The actual fork occurs from
the 'virginal fork parent', which does not any Ansible modules prior to
fork, to avoid conflicts resulting from custom module_utils paths.
:returns:
mitogen.core.Context of the new child.
"""
return self.call(ansible_mitogen.target.create_fork_child)
def exec_command(self, cmd, in_data='', sudoable=True, mitogen_chdir=None):
"""
Implement exec_command() by calling the corresponding

@ -35,8 +35,10 @@ files/modules known missing.
"""
from __future__ import absolute_import
import json
import logging
import os
import random
from ansible.executor import module_common
import ansible.errors
@ -132,20 +134,36 @@ class Planner(object):
file, indicates whether or not it understands how to run the module, and
exports a method to run the module.
"""
def detect(self, invocation):
def __init__(self, invocation):
self._inv = invocation
def detect(self):
"""
Return true if the supplied `invocation` matches the module type
implemented by this planner.
"""
raise NotImplementedError()
def get_should_fork(self, invocation):
def should_fork(self):
"""
Asynchronous tasks must always be forked.
"""
return invocation.wrap_async
return self._inv.wrap_async
def get_push_files(self):
"""
Return a list of files that should be propagated to the target context
using PushFileService. The default implementation pushes nothing.
"""
return []
def get_module_deps(self):
"""
Return a list of the Python module names imported by the module.
"""
return []
def plan(self, invocation, **kwargs):
def get_kwargs(self, **kwargs):
"""
If :meth:`detect` returned :data:`True`, plan for the module's
execution, including granting access to or delivering any files to it
@ -161,9 +179,7 @@ class Planner(object):
}
"""
kwargs.setdefault('emulate_tty', True)
kwargs.setdefault('service_context', invocation.connection.parent)
kwargs.setdefault('should_fork', self.get_should_fork(invocation))
kwargs.setdefault('wrap_async', invocation.wrap_async)
kwargs.setdefault('service_context', self._inv.connection.parent)
return kwargs
def __repr__(self):
@ -177,26 +193,19 @@ class BinaryPlanner(Planner):
"""
runner_name = 'BinaryRunner'
def detect(self, invocation):
return module_common._is_binary(invocation.module_source)
def detect(self):
return module_common._is_binary(self._inv.module_source)
def _grant_file_service_access(self, invocation):
invocation.connection.parent.call_service(
service_name='mitogen.service.PushFileService',
method_name='propagate_to',
path=invocation.module_path,
context=invocation.connection.context,
)
def get_push_files(self):
return [self._inv.module_path]
def plan(self, invocation, **kwargs):
self._grant_file_service_access(invocation)
return super(BinaryPlanner, self).plan(
invocation=invocation,
def get_kwargs(self, **kwargs):
return super(BinaryPlanner, self).get_kwargs(
runner_name=self.runner_name,
module=invocation.module_name,
path=invocation.module_path,
args=invocation.module_args,
env=invocation.env,
module=self._inv.module_name,
path=self._inv.module_path,
args=self._inv.module_args,
env=self._inv.env,
**kwargs
)
@ -206,24 +215,25 @@ class ScriptPlanner(BinaryPlanner):
Common functionality for script module planners -- handle interpreter
detection and rewrite.
"""
def _get_interpreter(self, invocation):
interpreter, arg = parse_script_interpreter(invocation.module_source)
def _get_interpreter(self):
interpreter, arg = parse_script_interpreter(
self._inv.module_source
)
if interpreter is None:
raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % (
invocation.module_name,
self._inv.module_name,
))
key = u'ansible_%s_interpreter' % os.path.basename(interpreter).strip()
try:
template = invocation.task_vars[key].strip()
return invocation.templar.template(template), arg
template = self._inv.task_vars[key].strip()
return self._inv.templar.template(template), arg
except KeyError:
return interpreter, arg
def plan(self, invocation, **kwargs):
interpreter, arg = self._get_interpreter(invocation)
return super(ScriptPlanner, self).plan(
invocation=invocation,
def get_kwargs(self, **kwargs):
interpreter, arg = self._get_interpreter()
return super(ScriptPlanner, self).get_kwargs(
interpreter_arg=arg,
interpreter=interpreter,
**kwargs
@ -237,8 +247,8 @@ class JsonArgsPlanner(ScriptPlanner):
"""
runner_name = 'JsonArgsRunner'
def detect(self, invocation):
return module_common.REPLACER_JSONARGS in invocation.module_source
def detect(self):
return module_common.REPLACER_JSONARGS in self._inv.module_source
class WantJsonPlanner(ScriptPlanner):
@ -255,8 +265,8 @@ class WantJsonPlanner(ScriptPlanner):
"""
runner_name = 'WantJsonRunner'
def detect(self, invocation):
return 'WANT_JSON' in invocation.module_source
def detect(self):
return 'WANT_JSON' in self._inv.module_source
class NewStylePlanner(ScriptPlanner):
@ -267,56 +277,59 @@ class NewStylePlanner(ScriptPlanner):
"""
runner_name = 'NewStyleRunner'
def _get_interpreter(self, invocation):
def detect(self):
return 'from ansible.module_utils.' in self._inv.module_source
def _get_interpreter(self):
return None, None
def _grant_file_service_access(self, invocation):
"""
Stub out BinaryPlanner's method since ModuleDepService makes internal
calls to grant file access, avoiding 2 IPCs per task invocation.
"""
def get_push_files(self):
return super(NewStylePlanner, self).get_push_files() + [
path
for fullname, path, is_pkg in self.get_module_map()['custom']
]
def get_should_fork(self, invocation):
def get_module_deps(self):
return self.get_module_map()['builtin']
def should_fork(self):
"""
In addition to asynchronous tasks, new-style modules should be forked
if mitogen_task_isolation=fork.
if the user specifies mitogen_task_isolation=fork, or if the new-style
module has a custom module search path.
"""
return (
super(NewStylePlanner, self).get_should_fork(invocation) or
(invocation.task_vars.get('mitogen_task_isolation') == 'fork')
super(NewStylePlanner, self).should_fork() or
(self._inv.task_vars.get('mitogen_task_isolation') == 'fork') or
(len(self.get_module_map()['custom']) > 0)
)
def detect(self, invocation):
return 'from ansible.module_utils.' in invocation.module_source
def get_search_path(self, invocation):
def get_search_path(self):
return tuple(
path
for path in module_utils_loader._get_paths(subdirs=False)
if os.path.isdir(path)
)
def get_module_map(self, invocation):
return invocation.connection.parent.call_service(
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
_module_map = None
module_name='ansible_module_%s' % (invocation.module_name,),
module_path=invocation.module_path,
search_path=self.get_search_path(invocation),
builtin_path=module_common._MODULE_UTILS_PATH,
context=invocation.connection.context,
)
def get_module_map(self):
if self._module_map is None:
self._module_map = self._inv.connection.parent.call_service(
service_name='ansible_mitogen.services.ModuleDepService',
method_name='scan',
def plan(self, invocation):
module_map = self.get_module_map(invocation)
return super(NewStylePlanner, self).plan(
invocation,
module_map=module_map,
should_fork=(
self.get_should_fork(invocation) or
len(module_map['custom']) > 0
module_name='ansible_module_%s' % (self._inv.module_name,),
module_path=self._inv.module_path,
search_path=self.get_search_path(),
builtin_path=module_common._MODULE_UTILS_PATH,
context=self._inv.connection.context,
)
return self._module_map
def get_kwargs(self):
return super(NewStylePlanner, self).get_kwargs(
module_map=self.get_module_map(),
)
@ -346,14 +359,14 @@ class ReplacerPlanner(NewStylePlanner):
"""
runner_name = 'ReplacerRunner'
def detect(self, invocation):
return module_common.REPLACER in invocation.module_source
def detect(self):
return module_common.REPLACER in self._inv.module_source
class OldStylePlanner(ScriptPlanner):
runner_name = 'OldStyleRunner'
def detect(self, invocation):
def detect(self):
# Everything else.
return True
@ -375,24 +388,84 @@ def get_module_data(name):
return path, source
def invoke(invocation):
"""
Find a suitable Planner that knows how to run `invocation`.
"""
(invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name)
def _propagate_deps(invocation, planner, context):
invocation.connection.parent.call_service(
service_name='mitogen.service.PushFileService',
method_name='propagate_paths_and_modules',
context=context,
paths=planner.get_push_files(),
modules=planner.get_module_deps(),
)
def _invoke_async_task(invocation, planner):
job_id = '%016x' % random.randint(0, 2**64)
context = invocation.connection.create_fork_child()
_propagate_deps(invocation, planner, context)
context.call_no_reply(
ansible_mitogen.target.run_module_async,
job_id=job_id,
kwargs=planner.get_kwargs(),
)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': job_id,
})
}
def _invoke_forked_task(invocation, planner):
context = invocation.connection.create_fork_child()
_propagate_deps(invocation, planner, context)
try:
return context.call(
ansible_mitogen.target.run_module,
kwargs=planner.get_kwargs(),
)
finally:
context.shutdown()
def _get_planner(invocation):
for klass in _planners:
planner = klass()
if planner.detect(invocation):
planner = klass(invocation)
if planner.detect():
LOG.debug('%r accepted %r (filename %r)', planner,
invocation.module_name, invocation.module_path)
return invocation.action._postprocess_response(
invocation.connection.call(
ansible_mitogen.target.run_module,
planner.plan(invocation),
)
)
return planner
LOG.debug('%r rejected %r', planner, invocation.module_name)
raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))
def invoke(invocation):
"""
Find a Planner subclass corresnding to `invocation` and use it to invoke
the module.
:param Invocation invocation:
:returns:
Module return dict.
:raises ansible.errors.AnsibleError:
Unrecognized/unsupported module type.
"""
(invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name)
planner = _get_planner(invocation)
if invocation.wrap_async:
response = _invoke_async_task(invocation, planner)
elif planner.should_fork():
response = _invoke_forked_task(invocation, planner)
else:
_propagate_deps(invocation, planner, invocation.connection.context)
response = invocation.connection.call(
ansible_mitogen.target.run_module,
kwargs=planner.get_kwargs(),
)
return invocation.action._postprocess_response(response)

@ -154,18 +154,13 @@ class MuxProcess(object):
Construct a ContextService and a thread to service requests for it
arriving from worker processes.
"""
file_service = mitogen.service.FileService(router=self.router)
push_file_service = mitogen.service.PushFileService(router=self.router)
self.pool = mitogen.service.Pool(
router=self.router,
services=[
file_service,
push_file_service,
mitogen.service.FileService(router=self.router),
mitogen.service.PushFileService(router=self.router),
ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.ModuleDepService(
router=self.router,
push_file_service=push_file_service,
),
ansible_mitogen.services.ModuleDepService(self.router),
],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),
)

@ -111,13 +111,20 @@ class Runner(object):
Context to which we should direct FileService calls. For now, always
the connection multiplexer process on the controller.
:param dict args:
Ansible module arguments. A strange mixture of user and internal keys
created by ActionBase._execute_module().
Ansible module arguments. A mixture of user and internal keys created
by :meth:`ansible.plugins.action.ActionBase._execute_module`.
:param dict env:
Additional environment variables to set during the run.
:param mitogen.core.ExternalContext econtext:
When `detach` is :data:`True`, a reference to the ExternalContext the
runner is executing in.
:param bool detach:
When :data:`True`, indicate the runner should detach the context from
its parent after setup has completed successfully.
"""
def __init__(self, module, service_context, econtext=None, detach=False,
args=None, env=None):
def __init__(self, module, service_context, args=None, env=None,
econtext=None, detach=False):
if args is None:
args = {}

@ -251,13 +251,18 @@ class ContextService(mitogen.service.Service):
{
'context': mitogen.core.Context or None,
'home_dir': str or None,
'init_child_result': {
'fork_context': mitogen.core.Context,
'home_dir': str or None,
},
'msg': str or None
}
Where either `msg` is an error message and the remaining fields are
:data:`None`, or `msg` is :data:`None` and the remaining fields are
set.
Where `context` is a reference to the newly constructed context,
`init_child_result` is the result of executing
:func:`ansible_mitogen.target.init_child` in that context, `msg` is
an error message and the remaining fields are :data:`None`, or
`msg` is :data:`None` and the remaining fields are set.
"""
try:
method = getattr(self.router, spec['method'])
@ -276,11 +281,7 @@ class ContextService(mitogen.service.Service):
lambda: self._on_stream_disconnect(stream))
self._send_module_forwards(context)
home_dir = context.call(os.path.expanduser, '~')
# We don't need to wait for the result of this. Ideally we'd check its
# return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.init_child)
init_child_result = context.call(ansible_mitogen.target.init_child)
if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'):
from mitogen import debug
@ -290,7 +291,7 @@ class ContextService(mitogen.service.Service):
self._refs_by_context[context] = 0
return {
'context': context,
'home_dir': home_dir,
'init_child_result': init_child_result,
'msg': None,
}
@ -341,7 +342,7 @@ class ContextService(mitogen.service.Service):
:returns dict:
* context: mitogen.master.Context or None.
* homedir: Context's home directory or None.
* init_child_result: Result of :func:`init_child`.
* msg: StreamError exception text or None.
* method_name: string failing method name.
"""
@ -356,7 +357,7 @@ class ContextService(mitogen.service.Service):
except mitogen.core.StreamError as e:
return {
'context': None,
'home_dir': None,
'init_child_result': None,
'method_name': spec['method'],
'msg': str(e),
}
@ -369,9 +370,8 @@ class ModuleDepService(mitogen.service.Service):
Scan a new-style module and produce a cached mapping of module_utils names
to their resolved filesystem paths.
"""
def __init__(self, push_file_service, **kwargs):
super(ModuleDepService, self).__init__(**kwargs)
self._push_file_service = push_file_service
def __init__(self, *args, **kwargs):
super(ModuleDepService, self).__init__(*args, **kwargs)
self._cache = {}
def _get_builtin_names(self, builtin_path, resolved):
@ -411,20 +411,4 @@ class ModuleDepService(mitogen.service.Service):
'builtin': builtin,
'custom': custom,
}
# Grant FileService access to paths in here to avoid another 2 IPCs
# from WorkerProcess.
self._push_file_service.propagate_to(
path=module_path,
context=context,
)
for fullname, path, is_pkg in custom:
self._push_file_service.propagate_to(
path=path,
context=context,
)
for name in self._cache[key]['builtin']:
self.router.responder.forward_module(context, name)
return self._cache[key]

@ -40,7 +40,6 @@ import logging
import operator
import os
import pwd
import random
import re
import stat
import subprocess
@ -81,7 +80,7 @@ def get_small_file(context, path):
:returns:
Bytestring file data.
"""
pool = mitogen.service.get_or_create_pool()
pool = mitogen.service.get_or_create_pool(router=context.router)
service = pool.get_service('mitogen.service.PushFileService')
return service.get(path)
@ -211,52 +210,51 @@ def init_child(econtext):
This is necessary to prevent modules that are executed in-process from
polluting the global interpreter state in a way that effects explicitly
isolated modules.
:returns:
Dict like::
{
'fork_context': mitogen.core.Context.
'home_dir': str.
}
Where `fork_context` refers to the newly forked 'fork parent' context
the controller will use to start forked jobs, and `home_dir` is the
home directory for the active user account.
"""
global _fork_parent
mitogen.parent.upgrade_router(econtext)
_fork_parent = econtext.router.fork()
reset_temp_dir(econtext)
return {
'fork_context': _fork_parent,
'home_dir': os.path.expanduser('~'),
}
@mitogen.core.takes_econtext
def start_fork_child(wrap_async, kwargs, econtext):
def create_fork_child(econtext):
"""
For helper functions executed in the fork parent context, arrange for
the context's router to be upgraded as necessary and for a new child to be
prepared.
"""
mitogen.parent.upgrade_router(econtext)
context = econtext.router.fork()
context.call(reset_temp_dir)
if not wrap_async:
try:
return context.call(run_module, kwargs)
finally:
context.shutdown()
job_id = '%016x' % random.randint(0, 2**64)
kwargs['detach'] = True
kwargs['econtext'] = econtext
context.call_async(run_module_async, job_id, kwargs)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': job_id,
})
}
LOG.debug('create_fork_child() -> %r', context)
return context
@mitogen.core.takes_econtext
def run_module(kwargs, econtext):
def run_module(kwargs):
"""
Set up the process environment in preparation for running an Ansible
module. This monkey-patches the Ansible libraries in various places to
prevent it from trying to kill the process on completion, and to prevent it
from reading sys.stdin.
"""
should_fork = kwargs.pop('should_fork', False)
wrap_async = kwargs.pop('wrap_async', False)
if should_fork:
return _fork_parent.call(start_fork_child, wrap_async, kwargs)
runner_name = kwargs.pop('runner_name')
klass = getattr(ansible_mitogen.runner, runner_name)
impl = klass(**kwargs)
@ -287,21 +285,27 @@ def _write_job_status(job_id, dct):
os.rename(path + '.tmp', path)
def _run_module_async(job_id, kwargs, econtext):
def _run_module_async(kwargs, job_id, econtext):
"""
Body on run_module_async().
1. Immediately updates the status file to mark the job as started.
2. Installs a timer/signal handler to implement the time limit.
3. Runs as with run_module(), writing the result to the status file.
:param dict kwargs:
Runner keyword arguments.
:param str job_id:
String job ID.
"""
_write_job_status(job_id, {
'started': 1,
'finished': 0
'finished': 0,
'pid': os.getpid()
})
#kwargs['detach'] = True
#kwargs['econtext'] = econtext
kwargs['emulate_tty'] = False
dct = run_module(kwargs, econtext)
dct = run_module(kwargs)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
@ -325,18 +329,21 @@ def _run_module_async(job_id, kwargs, econtext):
@mitogen.core.takes_econtext
def run_module_async(job_id, kwargs, econtext):
def run_module_async(kwargs, job_id, econtext):
"""
Since run_module_async() is invoked with .call_async(), with nothing to
read the result from the corresponding Receiver, wrap the body in an
exception logger, and wrap that in something that tears down the context on
completion.
Arrange for a module to be executed with its run status and result
serialized to a disk file. This function expects to run in a child forked
using :func:`create_fork_child`.
"""
try:
try:
_run_module_async(job_id, kwargs, econtext)
_run_module_async(kwargs, job_id, econtext)
except Exception:
LOG.exception('_run_module_async crashed')
# Catch any (ansible_mitogen) bugs and write them to the job file.
_write_job_status(job_id, {
"failed": 1,
"msg": traceback.format_exc(),
})
finally:
econtext.broker.shutdown()

Loading…
Cancel
Save