diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index ee34c22b..b5f80910 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -298,13 +298,17 @@ class Connection(ansible.plugins.connection.ConnectionBase): router = None #: mitogen.master.Context representing the parent Context, which is - #: presently always the master process. + #: presently always the connection multiplexer process. parent = None #: mitogen.master.Context connected to the target user account on the #: target machine (i.e. via sudo). context = None + #: mitogen.master.Context connected to the fork parent process in the + #: target user account. + fork_context = None + #: Only sudo and su are supported for now. become_methods = ['sudo', 'su'] @@ -336,7 +340,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): host_vars = None #: Set after connection to the target context's home directory. - _homedir = None + home_dir = None def __init__(self, play_context, new_stdin, **kwargs): assert ansible_mitogen.process.MuxProcess.unix_listener_path, ( @@ -376,7 +380,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): @property def homedir(self): self._connect() - return self._homedir + return self.home_dir @property def connected(self): @@ -470,15 +474,8 @@ class Connection(ansible.plugins.connection.ConnectionBase): raise ansible.errors.AnsibleConnectionFailure(dct['msg']) self.context = dct['context'] - self._homedir = dct['home_dir'] - - def get_context_name(self): - """ - Return the name of the target context we issue commands against, i.e. a - unique string useful as a key for related data, such as a list of - modules uploaded to the target. - """ - return self.context.name + self.fork_context = dct['init_child_result']['fork_context'] + self.home_dir = dct['init_child_result']['home_dir'] def close(self, new_task=False): """ @@ -526,6 +523,17 @@ class Connection(ansible.plugins.connection.ConnectionBase): LOG.debug('Call took %d ms: %s%r', 1000 * (time.time() - t0), func.func_name, args) + def create_fork_child(self): + """ + Fork a new child off the target context. The actual fork occurs from + the 'virginal fork parent', which does not any Ansible modules prior to + fork, to avoid conflicts resulting from custom module_utils paths. + + :returns: + mitogen.core.Context of the new child. + """ + return self.call(ansible_mitogen.target.create_fork_child) + def exec_command(self, cmd, in_data='', sudoable=True, mitogen_chdir=None): """ Implement exec_command() by calling the corresponding diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index ce635239..c540119b 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -35,8 +35,10 @@ files/modules known missing. """ from __future__ import absolute_import +import json import logging import os +import random from ansible.executor import module_common import ansible.errors @@ -132,20 +134,36 @@ class Planner(object): file, indicates whether or not it understands how to run the module, and exports a method to run the module. """ - def detect(self, invocation): + def __init__(self, invocation): + self._inv = invocation + + def detect(self): """ Return true if the supplied `invocation` matches the module type implemented by this planner. """ raise NotImplementedError() - def get_should_fork(self, invocation): + def should_fork(self): """ Asynchronous tasks must always be forked. """ - return invocation.wrap_async + return self._inv.wrap_async + + def get_push_files(self): + """ + Return a list of files that should be propagated to the target context + using PushFileService. The default implementation pushes nothing. + """ + return [] + + def get_module_deps(self): + """ + Return a list of the Python module names imported by the module. + """ + return [] - def plan(self, invocation, **kwargs): + def get_kwargs(self, **kwargs): """ If :meth:`detect` returned :data:`True`, plan for the module's execution, including granting access to or delivering any files to it @@ -161,9 +179,7 @@ class Planner(object): } """ kwargs.setdefault('emulate_tty', True) - kwargs.setdefault('service_context', invocation.connection.parent) - kwargs.setdefault('should_fork', self.get_should_fork(invocation)) - kwargs.setdefault('wrap_async', invocation.wrap_async) + kwargs.setdefault('service_context', self._inv.connection.parent) return kwargs def __repr__(self): @@ -177,26 +193,19 @@ class BinaryPlanner(Planner): """ runner_name = 'BinaryRunner' - def detect(self, invocation): - return module_common._is_binary(invocation.module_source) + def detect(self): + return module_common._is_binary(self._inv.module_source) - def _grant_file_service_access(self, invocation): - invocation.connection.parent.call_service( - service_name='mitogen.service.PushFileService', - method_name='propagate_to', - path=invocation.module_path, - context=invocation.connection.context, - ) + def get_push_files(self): + return [self._inv.module_path] - def plan(self, invocation, **kwargs): - self._grant_file_service_access(invocation) - return super(BinaryPlanner, self).plan( - invocation=invocation, + def get_kwargs(self, **kwargs): + return super(BinaryPlanner, self).get_kwargs( runner_name=self.runner_name, - module=invocation.module_name, - path=invocation.module_path, - args=invocation.module_args, - env=invocation.env, + module=self._inv.module_name, + path=self._inv.module_path, + args=self._inv.module_args, + env=self._inv.env, **kwargs ) @@ -206,24 +215,25 @@ class ScriptPlanner(BinaryPlanner): Common functionality for script module planners -- handle interpreter detection and rewrite. """ - def _get_interpreter(self, invocation): - interpreter, arg = parse_script_interpreter(invocation.module_source) + def _get_interpreter(self): + interpreter, arg = parse_script_interpreter( + self._inv.module_source + ) if interpreter is None: raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % ( - invocation.module_name, + self._inv.module_name, )) key = u'ansible_%s_interpreter' % os.path.basename(interpreter).strip() try: - template = invocation.task_vars[key].strip() - return invocation.templar.template(template), arg + template = self._inv.task_vars[key].strip() + return self._inv.templar.template(template), arg except KeyError: return interpreter, arg - def plan(self, invocation, **kwargs): - interpreter, arg = self._get_interpreter(invocation) - return super(ScriptPlanner, self).plan( - invocation=invocation, + def get_kwargs(self, **kwargs): + interpreter, arg = self._get_interpreter() + return super(ScriptPlanner, self).get_kwargs( interpreter_arg=arg, interpreter=interpreter, **kwargs @@ -237,8 +247,8 @@ class JsonArgsPlanner(ScriptPlanner): """ runner_name = 'JsonArgsRunner' - def detect(self, invocation): - return module_common.REPLACER_JSONARGS in invocation.module_source + def detect(self): + return module_common.REPLACER_JSONARGS in self._inv.module_source class WantJsonPlanner(ScriptPlanner): @@ -255,8 +265,8 @@ class WantJsonPlanner(ScriptPlanner): """ runner_name = 'WantJsonRunner' - def detect(self, invocation): - return 'WANT_JSON' in invocation.module_source + def detect(self): + return 'WANT_JSON' in self._inv.module_source class NewStylePlanner(ScriptPlanner): @@ -267,56 +277,59 @@ class NewStylePlanner(ScriptPlanner): """ runner_name = 'NewStyleRunner' - def _get_interpreter(self, invocation): + def detect(self): + return 'from ansible.module_utils.' in self._inv.module_source + + def _get_interpreter(self): return None, None - def _grant_file_service_access(self, invocation): - """ - Stub out BinaryPlanner's method since ModuleDepService makes internal - calls to grant file access, avoiding 2 IPCs per task invocation. - """ + def get_push_files(self): + return super(NewStylePlanner, self).get_push_files() + [ + path + for fullname, path, is_pkg in self.get_module_map()['custom'] + ] - def get_should_fork(self, invocation): + def get_module_deps(self): + return self.get_module_map()['builtin'] + + def should_fork(self): """ In addition to asynchronous tasks, new-style modules should be forked - if mitogen_task_isolation=fork. + if the user specifies mitogen_task_isolation=fork, or if the new-style + module has a custom module search path. """ return ( - super(NewStylePlanner, self).get_should_fork(invocation) or - (invocation.task_vars.get('mitogen_task_isolation') == 'fork') + super(NewStylePlanner, self).should_fork() or + (self._inv.task_vars.get('mitogen_task_isolation') == 'fork') or + (len(self.get_module_map()['custom']) > 0) ) - def detect(self, invocation): - return 'from ansible.module_utils.' in invocation.module_source - - def get_search_path(self, invocation): + def get_search_path(self): return tuple( path for path in module_utils_loader._get_paths(subdirs=False) if os.path.isdir(path) ) - def get_module_map(self, invocation): - return invocation.connection.parent.call_service( - service_name='ansible_mitogen.services.ModuleDepService', - method_name='scan', + _module_map = None - module_name='ansible_module_%s' % (invocation.module_name,), - module_path=invocation.module_path, - search_path=self.get_search_path(invocation), - builtin_path=module_common._MODULE_UTILS_PATH, - context=invocation.connection.context, - ) + def get_module_map(self): + if self._module_map is None: + self._module_map = self._inv.connection.parent.call_service( + service_name='ansible_mitogen.services.ModuleDepService', + method_name='scan', - def plan(self, invocation): - module_map = self.get_module_map(invocation) - return super(NewStylePlanner, self).plan( - invocation, - module_map=module_map, - should_fork=( - self.get_should_fork(invocation) or - len(module_map['custom']) > 0 + module_name='ansible_module_%s' % (self._inv.module_name,), + module_path=self._inv.module_path, + search_path=self.get_search_path(), + builtin_path=module_common._MODULE_UTILS_PATH, + context=self._inv.connection.context, ) + return self._module_map + + def get_kwargs(self): + return super(NewStylePlanner, self).get_kwargs( + module_map=self.get_module_map(), ) @@ -346,14 +359,14 @@ class ReplacerPlanner(NewStylePlanner): """ runner_name = 'ReplacerRunner' - def detect(self, invocation): - return module_common.REPLACER in invocation.module_source + def detect(self): + return module_common.REPLACER in self._inv.module_source class OldStylePlanner(ScriptPlanner): runner_name = 'OldStyleRunner' - def detect(self, invocation): + def detect(self): # Everything else. return True @@ -375,24 +388,84 @@ def get_module_data(name): return path, source -def invoke(invocation): - """ - Find a suitable Planner that knows how to run `invocation`. - """ - (invocation.module_path, - invocation.module_source) = get_module_data(invocation.module_name) +def _propagate_deps(invocation, planner, context): + invocation.connection.parent.call_service( + service_name='mitogen.service.PushFileService', + method_name='propagate_paths_and_modules', + context=context, + paths=planner.get_push_files(), + modules=planner.get_module_deps(), + ) + + +def _invoke_async_task(invocation, planner): + job_id = '%016x' % random.randint(0, 2**64) + context = invocation.connection.create_fork_child() + _propagate_deps(invocation, planner, context) + context.call_no_reply( + ansible_mitogen.target.run_module_async, + job_id=job_id, + kwargs=planner.get_kwargs(), + ) + + return { + 'stdout': json.dumps({ + # modules/utilities/logic/async_wrapper.py::_run_module(). + 'changed': True, + 'started': 1, + 'finished': 0, + 'ansible_job_id': job_id, + }) + } + + +def _invoke_forked_task(invocation, planner): + context = invocation.connection.create_fork_child() + _propagate_deps(invocation, planner, context) + try: + return context.call( + ansible_mitogen.target.run_module, + kwargs=planner.get_kwargs(), + ) + finally: + context.shutdown() + +def _get_planner(invocation): for klass in _planners: - planner = klass() - if planner.detect(invocation): + planner = klass(invocation) + if planner.detect(): LOG.debug('%r accepted %r (filename %r)', planner, invocation.module_name, invocation.module_path) - return invocation.action._postprocess_response( - invocation.connection.call( - ansible_mitogen.target.run_module, - planner.plan(invocation), - ) - ) + return planner LOG.debug('%r rejected %r', planner, invocation.module_name) - raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) + + +def invoke(invocation): + """ + Find a Planner subclass corresnding to `invocation` and use it to invoke + the module. + + :param Invocation invocation: + :returns: + Module return dict. + :raises ansible.errors.AnsibleError: + Unrecognized/unsupported module type. + """ + (invocation.module_path, + invocation.module_source) = get_module_data(invocation.module_name) + planner = _get_planner(invocation) + + if invocation.wrap_async: + response = _invoke_async_task(invocation, planner) + elif planner.should_fork(): + response = _invoke_forked_task(invocation, planner) + else: + _propagate_deps(invocation, planner, invocation.connection.context) + response = invocation.connection.call( + ansible_mitogen.target.run_module, + kwargs=planner.get_kwargs(), + ) + + return invocation.action._postprocess_response(response) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 96703e64..e97d695d 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -154,18 +154,13 @@ class MuxProcess(object): Construct a ContextService and a thread to service requests for it arriving from worker processes. """ - file_service = mitogen.service.FileService(router=self.router) - push_file_service = mitogen.service.PushFileService(router=self.router) self.pool = mitogen.service.Pool( router=self.router, services=[ - file_service, - push_file_service, + mitogen.service.FileService(router=self.router), + mitogen.service.PushFileService(router=self.router), ansible_mitogen.services.ContextService(self.router), - ansible_mitogen.services.ModuleDepService( - router=self.router, - push_file_service=push_file_service, - ), + ansible_mitogen.services.ModuleDepService(self.router), ], size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), ) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 6c353800..a399b453 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -111,13 +111,20 @@ class Runner(object): Context to which we should direct FileService calls. For now, always the connection multiplexer process on the controller. :param dict args: - Ansible module arguments. A strange mixture of user and internal keys - created by ActionBase._execute_module(). + Ansible module arguments. A mixture of user and internal keys created + by :meth:`ansible.plugins.action.ActionBase._execute_module`. :param dict env: Additional environment variables to set during the run. + + :param mitogen.core.ExternalContext econtext: + When `detach` is :data:`True`, a reference to the ExternalContext the + runner is executing in. + :param bool detach: + When :data:`True`, indicate the runner should detach the context from + its parent after setup has completed successfully. """ - def __init__(self, module, service_context, econtext=None, detach=False, - args=None, env=None): + def __init__(self, module, service_context, args=None, env=None, + econtext=None, detach=False): if args is None: args = {} diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 928af5fd..0331efd2 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -251,13 +251,18 @@ class ContextService(mitogen.service.Service): { 'context': mitogen.core.Context or None, - 'home_dir': str or None, + 'init_child_result': { + 'fork_context': mitogen.core.Context, + 'home_dir': str or None, + }, 'msg': str or None } - Where either `msg` is an error message and the remaining fields are - :data:`None`, or `msg` is :data:`None` and the remaining fields are - set. + Where `context` is a reference to the newly constructed context, + `init_child_result` is the result of executing + :func:`ansible_mitogen.target.init_child` in that context, `msg` is + an error message and the remaining fields are :data:`None`, or + `msg` is :data:`None` and the remaining fields are set. """ try: method = getattr(self.router, spec['method']) @@ -276,11 +281,7 @@ class ContextService(mitogen.service.Service): lambda: self._on_stream_disconnect(stream)) self._send_module_forwards(context) - home_dir = context.call(os.path.expanduser, '~') - - # We don't need to wait for the result of this. Ideally we'd check its - # return value somewhere, but logs will catch a failure anyway. - context.call_async(ansible_mitogen.target.init_child) + init_child_result = context.call(ansible_mitogen.target.init_child) if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'): from mitogen import debug @@ -290,7 +291,7 @@ class ContextService(mitogen.service.Service): self._refs_by_context[context] = 0 return { 'context': context, - 'home_dir': home_dir, + 'init_child_result': init_child_result, 'msg': None, } @@ -341,7 +342,7 @@ class ContextService(mitogen.service.Service): :returns dict: * context: mitogen.master.Context or None. - * homedir: Context's home directory or None. + * init_child_result: Result of :func:`init_child`. * msg: StreamError exception text or None. * method_name: string failing method name. """ @@ -356,7 +357,7 @@ class ContextService(mitogen.service.Service): except mitogen.core.StreamError as e: return { 'context': None, - 'home_dir': None, + 'init_child_result': None, 'method_name': spec['method'], 'msg': str(e), } @@ -369,9 +370,8 @@ class ModuleDepService(mitogen.service.Service): Scan a new-style module and produce a cached mapping of module_utils names to their resolved filesystem paths. """ - def __init__(self, push_file_service, **kwargs): - super(ModuleDepService, self).__init__(**kwargs) - self._push_file_service = push_file_service + def __init__(self, *args, **kwargs): + super(ModuleDepService, self).__init__(*args, **kwargs) self._cache = {} def _get_builtin_names(self, builtin_path, resolved): @@ -411,20 +411,4 @@ class ModuleDepService(mitogen.service.Service): 'builtin': builtin, 'custom': custom, } - - # Grant FileService access to paths in here to avoid another 2 IPCs - # from WorkerProcess. - self._push_file_service.propagate_to( - path=module_path, - context=context, - ) - - for fullname, path, is_pkg in custom: - self._push_file_service.propagate_to( - path=path, - context=context, - ) - - for name in self._cache[key]['builtin']: - self.router.responder.forward_module(context, name) return self._cache[key] diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 1b3d587b..63adba5f 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -40,7 +40,6 @@ import logging import operator import os import pwd -import random import re import stat import subprocess @@ -81,7 +80,7 @@ def get_small_file(context, path): :returns: Bytestring file data. """ - pool = mitogen.service.get_or_create_pool() + pool = mitogen.service.get_or_create_pool(router=context.router) service = pool.get_service('mitogen.service.PushFileService') return service.get(path) @@ -211,52 +210,51 @@ def init_child(econtext): This is necessary to prevent modules that are executed in-process from polluting the global interpreter state in a way that effects explicitly isolated modules. + + :returns: + Dict like:: + + { + 'fork_context': mitogen.core.Context. + 'home_dir': str. + } + + Where `fork_context` refers to the newly forked 'fork parent' context + the controller will use to start forked jobs, and `home_dir` is the + home directory for the active user account. """ global _fork_parent mitogen.parent.upgrade_router(econtext) _fork_parent = econtext.router.fork() reset_temp_dir(econtext) + return { + 'fork_context': _fork_parent, + 'home_dir': os.path.expanduser('~'), + } + @mitogen.core.takes_econtext -def start_fork_child(wrap_async, kwargs, econtext): +def create_fork_child(econtext): + """ + For helper functions executed in the fork parent context, arrange for + the context's router to be upgraded as necessary and for a new child to be + prepared. + """ mitogen.parent.upgrade_router(econtext) context = econtext.router.fork() context.call(reset_temp_dir) - if not wrap_async: - try: - return context.call(run_module, kwargs) - finally: - context.shutdown() - - job_id = '%016x' % random.randint(0, 2**64) - kwargs['detach'] = True - kwargs['econtext'] = econtext - context.call_async(run_module_async, job_id, kwargs) - return { - 'stdout': json.dumps({ - # modules/utilities/logic/async_wrapper.py::_run_module(). - 'changed': True, - 'started': 1, - 'finished': 0, - 'ansible_job_id': job_id, - }) - } + LOG.debug('create_fork_child() -> %r', context) + return context -@mitogen.core.takes_econtext -def run_module(kwargs, econtext): +def run_module(kwargs): """ Set up the process environment in preparation for running an Ansible module. This monkey-patches the Ansible libraries in various places to prevent it from trying to kill the process on completion, and to prevent it from reading sys.stdin. """ - should_fork = kwargs.pop('should_fork', False) - wrap_async = kwargs.pop('wrap_async', False) - if should_fork: - return _fork_parent.call(start_fork_child, wrap_async, kwargs) - runner_name = kwargs.pop('runner_name') klass = getattr(ansible_mitogen.runner, runner_name) impl = klass(**kwargs) @@ -287,21 +285,27 @@ def _write_job_status(job_id, dct): os.rename(path + '.tmp', path) -def _run_module_async(job_id, kwargs, econtext): +def _run_module_async(kwargs, job_id, econtext): """ - Body on run_module_async(). - 1. Immediately updates the status file to mark the job as started. 2. Installs a timer/signal handler to implement the time limit. 3. Runs as with run_module(), writing the result to the status file. + + :param dict kwargs: + Runner keyword arguments. + :param str job_id: + String job ID. """ _write_job_status(job_id, { 'started': 1, - 'finished': 0 + 'finished': 0, + 'pid': os.getpid() }) + #kwargs['detach'] = True + #kwargs['econtext'] = econtext kwargs['emulate_tty'] = False - dct = run_module(kwargs, econtext) + dct = run_module(kwargs) if mitogen.core.PY3: for key in 'stdout', 'stderr': dct[key] = dct[key].decode('utf-8', 'surrogateescape') @@ -325,18 +329,21 @@ def _run_module_async(job_id, kwargs, econtext): @mitogen.core.takes_econtext -def run_module_async(job_id, kwargs, econtext): +def run_module_async(kwargs, job_id, econtext): """ - Since run_module_async() is invoked with .call_async(), with nothing to - read the result from the corresponding Receiver, wrap the body in an - exception logger, and wrap that in something that tears down the context on - completion. + Arrange for a module to be executed with its run status and result + serialized to a disk file. This function expects to run in a child forked + using :func:`create_fork_child`. """ try: try: - _run_module_async(job_id, kwargs, econtext) + _run_module_async(kwargs, job_id, econtext) except Exception: - LOG.exception('_run_module_async crashed') + # Catch any (ansible_mitogen) bugs and write them to the job file. + _write_job_status(job_id, { + "failed": 1, + "msg": traceback.format_exc(), + }) finally: econtext.broker.shutdown()