From 3613162bc04bc4a7229a46f580f65608cccecc07 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Sun, 8 Apr 2018 23:50:57 +0100 Subject: [PATCH] ansible: enable forking when requested and for async jobs. Closes #105. References #155. mitogen/service.py: Refactor services to support individually exposed methods with different security policies for each method. - @mitogen.service.expose() to expose a method and set its policy - @mitogen.service.arg_spec() to validate input. - Require basic service message format to be a tuple of `(method, kwargs)`, where kwargs is always a dict. - Update DeduplicatingService to match the new scheme. ansible_mitogen/connection.py: - Rename 'method' to 'method_name' to disambiguate it from the service.call()'s method= argument. ansible_mitogen/planner.py: - Generate an ID for every job, sync or not, and fetch job results from JobResultService rather than via the initiating function call's return value. - Planner subclasses now get to select whether their Runner should run in a forked process. The base implementation requests this if the 'mitogen_isolation_mode=fork' task variable is present. ansible_mitogen/runner.py: Teach runners to deliver their result via JobResultService executing in their indirect parent mux process. ansible_mitogen/plugins/actions/mitogen_async_status.py: Split the implementation up into methods, and more compatibly emulate Ansible's existing output. ansible_mitogen/process.py: Mux processes now host JobResultService. ansible_mitogen/services.py: Update existing services to the new mitogen.service scheme, and implement JobResultService: * listen() method for synchronous jobs. planner.invoke() registers a Sender with the service prior to invoking the job, then sleeps waiting for the service to write the job result to the corresponding Receiver. * Non-blocking get() method for implementing mitogen_async_status action. * Child-accessible push() method for delivering task results. ansible_mitogen/target.py: New helpers for spawning a virginal subprocess on startup, from which asynchronous and mitogen_task_isolation=fork jobs are forked. Necessary to avoid a task inheriting potentially polluted/monkey-patched parent environment, since remaining jobs continue to run in the original child process. docs/ansible.rst: Add/merge/remove some behaviours/risks. tests/ansible/integration: New tests for forking/async. --- ansible_mitogen/connection.py | 13 +- ansible_mitogen/planner.py | 114 ++++++++--- .../plugins/actions/mitogen_async_status.py | 61 +++--- ansible_mitogen/process.py | 1 + ansible_mitogen/runner.py | 22 ++- ansible_mitogen/services.py | 119 +++++++++--- ansible_mitogen/strategy.py | 2 + ansible_mitogen/target.py | 118 ++++++------ docs/ansible.rst | 69 ++++--- docs/services.rst | 11 +- mitogen/service.py | 177 ++++++++++++------ tests/ansible/integration/async_polling.yml | 14 -- tests/ansible/integration/runner/all.yml | 4 + .../integration/runner/async_job_timeout.yml | 52 +++++ .../integration/runner/async_one_job.yml | 99 ++++++++++ .../runner/async_two_simulataneous_jobs.yml | 54 ++++++ .../integration/runner/forking_behaviour.yml | 45 +++++ .../custom_python_detect_environment.py | 4 + tests/ansible/run_ansible_playbook.sh | 9 +- 19 files changed, 719 insertions(+), 269 deletions(-) delete mode 100644 tests/ansible/integration/async_polling.yml create mode 100644 tests/ansible/integration/runner/async_job_timeout.yml create mode 100644 tests/ansible/integration/runner/async_one_job.yml create mode 100644 tests/ansible/integration/runner/async_two_simulataneous_jobs.yml create mode 100644 tests/ansible/integration/runner/forking_behaviour.yml diff --git a/ansible_mitogen/connection.py b/ansible_mitogen/connection.py index 65664b31..b1724186 100644 --- a/ansible_mitogen/connection.py +++ b/ansible_mitogen/connection.py @@ -137,11 +137,12 @@ class Connection(ansible.plugins.connection.ConnectionBase): def connected(self): return self.broker is not None - def _wrap_connect(self, args): + def _wrap_connect(self, kwargs): dct = mitogen.service.call( context=self.parent, handle=ContextService.handle, - obj=mitogen.utils.cast(args), + method='connect', + kwargs=mitogen.utils.cast(kwargs), ) if dct['msg']: @@ -155,7 +156,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): master process. """ return self._wrap_connect({ - 'method': 'local', + 'method_name': 'local', 'python_path': self.python_path, }) @@ -165,7 +166,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): ContextService in the master process. """ return self._wrap_connect({ - 'method': 'ssh', + 'method_name': 'ssh', 'check_host_keys': False, # TODO 'hostname': self._play_context.remote_addr, 'discriminator': self.mitogen_ssh_discriminator, @@ -189,7 +190,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): def _connect_docker(self): return self._wrap_connect({ - 'method': 'docker', + 'method_name': 'docker', 'container': self._play_context.remote_addr, 'python_path': self.python_path, 'connect_timeout': self._play_context.timeout, @@ -205,7 +206,7 @@ class Connection(ansible.plugins.connection.ConnectionBase): be a Context returned by _connect_ssh(). """ return self._wrap_connect({ - 'method': 'sudo', + 'method_name': 'sudo', 'username': self._play_context.become_user, 'password': self._play_context.become_pass, 'python_path': python_path or self.python_path, diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index cd477fb0..282c41eb 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -35,8 +35,10 @@ files/modules known missing. """ from __future__ import absolute_import +import json import logging import os +import random from ansible.executor import module_common import ansible.errors @@ -115,6 +117,8 @@ class Invocation(object): self.env = env #: Boolean, if :py:data:`True`, launch the module asynchronously. self.wrap_async = wrap_async + #: String Job ID. + self.job_id = self._make_job_id() #: Initially ``None``, but set by :func:`invoke`. The path on the #: master to the module's implementation file. @@ -123,6 +127,9 @@ class Invocation(object): #: binary contents of the module. self.module_source = None + def _make_job_id(self): + return '%016x' % random.randint(0, 2**64) + def __repr__(self): return 'Invocation(module_name=%s)' % (self.module_name,) @@ -140,7 +147,10 @@ class Planner(object): """ raise NotImplementedError() - def plan(self, invocation): + def get_should_fork(self, invocation): + return invocation.wrap_async + + def plan(self, invocation, **kwargs): """ If :meth:`detect` returned :data:`True`, plan for the module's execution, including granting access to or delivering any files to it @@ -155,7 +165,10 @@ class Planner(object): # named by `runner_name`. } """ - raise NotImplementedError() + kwargs.setdefault('job_id', invocation.job_id) + kwargs.setdefault('service_context', invocation.connection.parent) + kwargs.setdefault('should_fork', self.get_should_fork(invocation)) + return kwargs class BinaryPlanner(Planner): @@ -168,22 +181,26 @@ class BinaryPlanner(Planner): def detect(self, invocation): return module_common._is_binary(invocation.module_source) - def plan(self, invocation): + def plan(self, invocation, **kwargs): invocation.connection._connect() mitogen.service.call( - invocation.connection.parent, - ansible_mitogen.services.FileService.handle, - ('register', invocation.module_path) + context=invocation.connection.parent, + handle=ansible_mitogen.services.FileService.handle, + method='register', + kwargs={ + 'path': invocation.module_path + } + ) + return super(BinaryPlanner, self).plan( + invocation=invocation, + runner_name=self.runner_name, + module=invocation.module_name, + path=invocation.module_path, + args=invocation.module_args, + env=invocation.env, + remote_tmp=invocation.remote_tmp, + **kwargs ) - return { - 'runner_name': self.runner_name, - 'module': invocation.module_name, - 'service_context': invocation.connection.parent, - 'path': invocation.module_path, - 'args': invocation.module_args, - 'env': invocation.env, - 'remote_tmp': invocation.remote_tmp, - } class ScriptPlanner(BinaryPlanner): @@ -199,20 +216,21 @@ class ScriptPlanner(BinaryPlanner): except KeyError: return interpreter - def plan(self, invocation): - kwargs = super(ScriptPlanner, self).plan(invocation) + def plan(self, invocation, **kwargs): interpreter, arg = parse_script_interpreter(invocation.module_source) if interpreter is None: raise ansible.errors.AnsibleError(NO_INTERPRETER_MSG % ( invocation.module_name, )) - return dict(kwargs, + return super(ScriptPlanner, self).plan( + invocation=invocation, interpreter_arg=arg, interpreter=self._rewrite_interpreter( interpreter=interpreter, invocation=invocation - ) + ), + **kwargs ) @@ -283,6 +301,12 @@ class NewStylePlanner(ScriptPlanner): """ runner_name = 'NewStyleRunner' + def get_should_fork(self, invocation): + return ( + super(NewStylePlanner, self).get_should_fork(invocation) or + (invocation.task_vars.get('mitogen_task_isolation') == 'fork') + ) + def detect(self, invocation): return 'from ansible.module_utils.' in invocation.module_source @@ -319,10 +343,7 @@ def get_module_data(name): return path, source -def invoke(invocation): - """ - Find a suitable Planner that knows how to run `invocation`. - """ +def _do_invoke(invocation): (invocation.module_path, invocation.module_source) = get_module_data(invocation.module_name) @@ -333,17 +354,50 @@ def invoke(invocation): else: raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) - kwargs = planner.plan(invocation) - if invocation.wrap_async: - helper = ansible_mitogen.target.run_module_async - else: - helper = ansible_mitogen.target.run_module - try: - js = invocation.connection.call(helper, kwargs) + kwargs = planner.plan(invocation) + invocation.connection.call(ansible_mitogen.target.run_module, kwargs) except mitogen.core.CallError as e: LOG.exception('invocation crashed: %r', invocation) summary = str(e).splitlines()[0] raise ansible.errors.AnsibleInternalError(CRASHED_MSG + summary) + +def _invoke_async(invocation): + _do_invoke(invocation) + return { + 'stdout': json.dumps({ + # modules/utilities/logic/async_wrapper.py::_run_module(). + 'changed': True, + 'started': 1, + 'finished': 0, + 'ansible_job_id': invocation.job_id, + }) + } + + +def _invoke_sync(invocation): + recv = mitogen.core.Receiver(invocation.connection.router) + mitogen.service.call_async( + context=invocation.connection.parent, + handle=ansible_mitogen.services.JobResultService.handle, + method='listen', + kwargs={ + 'job_id': invocation.job_id, + 'sender': recv.to_sender(), + } + ) + _do_invoke(invocation) + return recv.get().unpickle() + + +def invoke(invocation): + """ + Find a suitable Planner that knows how to run `invocation`. + """ + if invocation.wrap_async: + js = _invoke_async(invocation) + else: + js = _invoke_sync(invocation) + return invocation.action._postprocess_response(js) diff --git a/ansible_mitogen/plugins/actions/mitogen_async_status.py b/ansible_mitogen/plugins/actions/mitogen_async_status.py index d57a393a..611718b2 100644 --- a/ansible_mitogen/plugins/actions/mitogen_async_status.py +++ b/ansible_mitogen/plugins/actions/mitogen_async_status.py @@ -28,38 +28,51 @@ import ansible.plugins.action import mitogen.core +import mitogen.utils +import ansible_mitogen.services import ansible_mitogen.target -from mitogen.utils import cast class ActionModule(ansible.plugins.action.ActionBase): - def run(self, tmp=None, task_vars=None): - job_id = self._task.args['jid'] - try: - result = self._connection.call( - ansible_mitogen.target.get_async_result, - cast(job_id), - ) - except mitogen.core.CallError, e: - return { - 'ansible_job_id': job_id, - 'started': 1, - 'failed': 1, - 'finished': 1, - 'msg': str(e), + def _get_async_result(self, job_id): + self._connection._connect() + return mitogen.service.call( + context=self._connection.parent, + handle=ansible_mitogen.services.JobResultService.handle, + method='get', + kwargs={ + 'job_id': job_id, } + ) - if result is None: - return { - 'ansible_job_id': job_id, - 'started': 1, - 'failed': 0, - 'finished': 0, - 'msg': '', - } + def _on_result_pending(self, job_id): + return { + '_ansible_parsed': True, + 'ansible_job_id': job_id, + 'started': 1, + 'failed': 0, + 'finished': 0, + 'msg': '', + } - dct = self._parse_returned_data({'stdout': result}) + def _on_result_available(self, job_id, result): + dct = self._parse_returned_data(result) dct['ansible_job_id'] = job_id dct['started'] = 1 dct['finished'] = 1 + + # Cutpasted from the action.py. + if 'stdout' in dct and 'stdout_lines' not in dct: + dct['stdout_lines'] = (dct['stdout'] or u'').splitlines() + if 'stderr' in dct and 'stderr_lines' not in dct: + dct['stderr_lines'] = (dct['stderr'] or u'').splitlines() return dct + + def run(self, tmp=None, task_vars=None): + job_id = mitogen.utils.cast(self._task.args['jid']) + + result = self._get_async_result(job_id) + if result is None: + return self._on_result_pending(job_id) + else: + return self._on_result_available(job_id, result) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 8febea90..0cb0b987 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -155,6 +155,7 @@ class MuxProcess(object): services=[ ansible_mitogen.services.ContextService(self.router), ansible_mitogen.services.FileService(self.router), + ansible_mitogen.services.JobResultService(self.router), ], size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), ) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 43a33489..430d0a87 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -45,6 +45,7 @@ import sys import tempfile import types +import mitogen.service import ansible_mitogen.target # TODO: circular import try: @@ -82,14 +83,17 @@ class Runner(object): Subclasses may override `_run`()` and extend `setup()` and `revert()`. """ - def __init__(self, module, remote_tmp, raw_params=None, args=None, env=None): + def __init__(self, module, job_id, remote_tmp, service_context, + raw_params=None, args=None, env=None): if args is None: args = {} if raw_params is not None: args['_raw_params'] = raw_params self.module = module + self.job_id = job_id self.remote_tmp = os.path.expanduser(remote_tmp) + self.service_context = service_context self.raw_params = raw_params self.args = args self.env = env @@ -131,6 +135,17 @@ class Runner(object): """ raise NotImplementedError() + def _send_result(self, dct): + mitogen.service.call( + context=self.service_context, + handle=502, + method='push', + kwargs={ + 'job_id': self.job_id, + 'result': dct + } + ) + def run(self): """ Set up the process environment in preparation for running an Ansible @@ -143,7 +158,7 @@ class Runner(object): """ self.setup() try: - return self._run() + self._send_result(self._run()) finally: self.revert() @@ -193,10 +208,9 @@ class NewStyleStdio(object): class ProgramRunner(Runner): - def __init__(self, path, service_context, **kwargs): + def __init__(self, path, **kwargs): super(ProgramRunner, self).__init__(**kwargs) self.path = path - self.service_context = service_context def setup(self): super(ProgramRunner, self).setup() diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 18782f20..b5715d4e 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -29,15 +29,21 @@ from __future__ import absolute_import import logging import os.path +import threading import zlib import mitogen import mitogen.service +import ansible_mitogen.target LOG = logging.getLogger(__name__) +class Error(Exception): + pass + + class ContextService(mitogen.service.DeduplicatingService): """ Used by worker processes connecting back into the top-level process to @@ -74,15 +80,17 @@ class ContextService(mitogen.service.DeduplicatingService): """ handle = 500 max_message_size = 1000 - required_args = { - 'method': str - } - def get_response(self, args): - args.pop('discriminator', None) - method = getattr(self.router, args.pop('method')) + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'method_name': str + }) + def connect(self, method_name, discriminator=None, **kwargs): + method = getattr(self.router, method_name, None) + if method is None: + raise Error('no such Router method: %s' % (method_name,)) try: - context = method(**args) + context = method(**kwargs) except mitogen.core.StreamError as e: return { 'context': None, @@ -91,6 +99,10 @@ class ContextService(mitogen.service.DeduplicatingService): } home_dir = context.call(os.path.expanduser, '~') + + # We don't need to wait for the result of this. Ideally we'd check its + # return value somewhere, but logs will catch any failures anyway. + context.call_async(ansible_mitogen.target.start_fork_parent) return { 'context': context, 'home_dir': home_dir, @@ -135,32 +147,77 @@ class FileService(mitogen.service.Service): super(FileService, self).__init__(router) self._paths = {} - def validate_args(self, args): - return ( - isinstance(args, tuple) and - len(args) == 2 and - args[0] in ('register', 'fetch') and - isinstance(args[1], basestring) - ) - - def dispatch(self, args, msg): - cmd, path = args - return getattr(self, cmd)(path, msg) + @mitogen.service.expose(policy=mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'path': basestring + }) + def register(self, path): + if path not in self._paths: + LOG.info('%r: registering %r', self, path) + with open(path, 'rb') as fp: + self._paths[path] = zlib.compress(fp.read()) + + @mitogen.service.expose(policy=mitogen.service.AllowAny()) + @mitogen.service.arg_spec({ + 'path': basestring + }) + def fetch(self, path): + if path not in self._paths: + raise mitogen.core.CallError(self.unregistered_msg) - def register(self, path, msg): - if not mitogen.core.has_parent_authority(msg): - raise mitogen.core.CallError(self.unprivileged_msg) + LOG.debug('Serving %r', path) + return self._paths[path] - if path in self._paths: - return - LOG.info('%r: registering %r', self, path) - with open(path, 'rb') as fp: - self._paths[path] = zlib.compress(fp.read()) +class JobResultService(mitogen.service.Service): + """ + Receive the result of a task from a child and forward it to interested + listeners. If no listener exists, store the result until it is requested. - def fetch(self, path, msg): - if path not in self._paths: - raise mitogen.core.CallError(self.unregistered_msg) + Results are keyed by job ID. + """ + handle = 502 + max_message_size = 1048576 * 64 - LOG.debug('Serving %r to context %r', path, msg.src_id) - return self._paths[path] + def __init__(self, router): + super(JobResultService, self).__init__(router) + self._lock = threading.Lock() + self._result_by_job_id = {} + self._sender_by_job_id = {} + + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'job_id': str, + 'sender': mitogen.core.Sender, + }) + def listen(self, job_id, sender): + LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender) + with self._lock: + if job_id in self._sender_by_job_id: + raise Error('Listener already exists for job: %s' % (job_id,)) + self._sender_by_job_id[job_id] = sender + + @mitogen.service.expose(mitogen.service.AllowParents()) + @mitogen.service.arg_spec({ + 'job_id': basestring, + }) + def get(self, job_id): + LOG.debug('%r.get(job_id=%r)', self, job_id) + with self._lock: + return self._result_by_job_id.pop(job_id, None) + + @mitogen.service.expose(mitogen.service.AllowAny()) + @mitogen.service.arg_spec({ + 'job_id': basestring, + 'result': dict + }) + def push(self, job_id, result): + LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result) + with self._lock: + if job_id in self._result_by_job_id: + raise Error('Result already exists for job: %s' % (job_id,)) + sender = self._sender_by_job_id.pop(job_id, None) + if sender: + sender.send(result) + else: + self._result_by_job_id[job_id] = result diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index c70f245b..bc329842 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -55,6 +55,8 @@ def wrap_action_loader__get(name, *args, **kwargs): This is used instead of static subclassing as it generalizes to third party action modules outside the Ansible tree. """ + # Necessary since async_status execution strategy is hard-wired in + # executor/task_executor.py::_poll_async_result(). if ( name == 'normal' and 'task' in kwargs and kwargs['task'].action == 'async_status'): name = 'mitogen_async_status' diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 167afbb5..268bdd51 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -26,6 +26,11 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +""" +Helper functions intended to be executed on the target. These are entrypoints +for file transfer, module execution and sundry bits like changing file modes. +""" + from __future__ import absolute_import import json import logging @@ -40,10 +45,12 @@ import tempfile import threading import zlib -import mitogen.core -import mitogen.service import ansible_mitogen.runner import ansible_mitogen.services +import mitogen.core +import mitogen.fork +import mitogen.parent +import mitogen.service LOG = logging.getLogger(__name__) @@ -51,17 +58,15 @@ LOG = logging.getLogger(__name__) #: Caching of fetched file data. _file_cache = {} -#: Mapping of job_id<->result dict -_result_by_job_id = {} - -#: Mapping of job_id<->threading.Thread -_thread_by_job_id = {} +#: Initialized to an econtext.parent.Context pointing at a pristine fork of +#: the target Python interpreter before it executes any code or imports. +_fork_parent = None def get_file(context, path): """ Basic in-memory caching module fetcher. This generates an one roundtrip for - every previously unseen module, so it is only temporary. + every previously unseen file, so it is only a temporary solution. :param context: Context we should direct FileService requests to. For now (and probably @@ -75,39 +80,64 @@ def get_file(context, path): if path not in _file_cache: _file_cache[path] = zlib.decompress( mitogen.service.call( - context, - ansible_mitogen.services.FileService.handle, - ('fetch', path) + context=context, + handle=ansible_mitogen.services.FileService.handle, + method='fetch', + kwargs={ + 'path': path + } ) ) return _file_cache[path] -def run_module(kwargs): +@mitogen.core.takes_econtext +def start_fork_parent(econtext): + """ + Called by ContextService immediately after connection; arranges for the + (presently) spotless Python interpreter to be forked, where the newly + forked interpreter becomes the parent of any newly forked future + interpreters. + + This is necessary to prevent modules that are executed in-process from + polluting the global interpreter state in a way that effects explicitly + isolated modules. + """ + mitogen.parent.upgrade_router(econtext) + + global _fork_parent + _fork_parent = econtext.router.fork() + + +@mitogen.core.takes_econtext +def start_fork_child(kwargs, econtext): + mitogen.parent.upgrade_router(econtext) + context = econtext.router.fork() + kwargs['shutdown_on_exit'] = True + context.call_async(run_module, kwargs) + + +@mitogen.core.takes_econtext +def run_module(kwargs, econtext): """ Set up the process environment in preparation for running an Ansible module. This monkey-patches the Ansible libraries in various places to prevent it from trying to kill the process on completion, and to prevent it from reading sys.stdin. """ + should_fork = kwargs.pop('should_fork', False) + shutdown_on_exit = kwargs.pop('shutdown_on_exit', False) + if should_fork: + _fork_parent.call(start_fork_child, kwargs) + return + runner_name = kwargs.pop('runner_name') klass = getattr(ansible_mitogen.runner, runner_name) impl = klass(**kwargs) - return impl.run() - - -def _async_main(job_id, runner_name, kwargs): - """ - Implementation for the thread that implements asynchronous module - execution. - """ - try: - rc = run_module(runner_name, kwargs) - except Exception, e: - rc = mitogen.core.CallError(e) - - _result_by_job_id[job_id] = rc + impl.run() + if shutdown_on_exit: + econtext.broker.shutdown() def make_temp_directory(base_dir): @@ -125,42 +155,6 @@ def make_temp_directory(base_dir): prefix='ansible-mitogen-tmp-', ) -def run_module_async(runner_name, kwargs): - """ - Arrange for an Ansible module to be executed in a thread of the current - process, with results available via :py:func:`get_async_result`. - """ - job_id = '%08x' % random.randint(0, 2**32-1) - _result_by_job_id[job_id] = None - _thread_by_job_id[job_id] = threading.Thread( - target=_async_main, - kwargs={ - 'job_id': job_id, - 'runner_name': runner_name, - 'kwargs': kwargs, - } - ) - _thread_by_job_id[job_id].start() - return json.dumps({ - 'ansible_job_id': job_id, - 'changed': True - }) - - -def get_async_result(job_id): - """ - Poll for the result of an asynchronous task. - - :param str job_id: - Job ID to poll for. - :returns: - ``None`` if job is still running, JSON-encoded result dictionary if - execution completed normally, or :py:class:`mitogen.core.CallError` if - an exception was thrown. - """ - if not _thread_by_job_id[job_id].isAlive(): - return _result_by_job_id[job_id] - def get_user_shell(): """ diff --git a/docs/ansible.rst b/docs/ansible.rst index 15ccc07a..825fb66b 100644 --- a/docs/ansible.rst +++ b/docs/ansible.rst @@ -128,25 +128,20 @@ This is a proof of concept: issues below are exclusively due to code immaturity. High Risk ~~~~~~~~~ -* Transfer of large (i.e. GB-sized) files using certain Ansible-internal APIs, - such as triggered via the ``copy`` module, will cause corresponding temporary - memory and CPU spikes on both host and target machine, due to delivering the - file as a single large message. If many machines are targetted with a large - file, the host machine could easily exhaust available RAM. This will be fixed - soon as it's likely to be tickled by common playbooks. - -* Local actions are single threaded. Any that execute for every target will - experience artificial serialization, causing slowdown equivalent to - `task_duration * num_targets`. This will be fixed soon. +* Transfer of large files using certain Ansible-internal APIs, such as + triggered via the ``copy`` module, will cause corresponding memory and CPU + spikes on both host and target machine, due to delivering the file as a + single message. If many machines are targetted, the controller could easily + exhaust available RAM. This will be fixed soon as it's likely to be tickled + by common playbooks. * `Asynchronous Actions And Polling `_ has received - minimal testing. Jobs execute in a thread of the target Python interpreter. - This will fixed shortly. + minimal testing. -* No mechanism exists yet to bound the number of interpreters created during a - run. For some playbooks that parameterize ``become_user`` over a large number - of user accounts, resource exhaustion may be triggered on the target machine. +* No mechanism exists to bound the number of interpreters created during a run. + For some playbooks that parameterize ``become_user`` over many accounts, + resource exhaustion may be triggered on the target machine. * Only Ansible 2.4 is being used for development, with occasional tests under 2.5, 2.3 and 2.2. It should be more than possible to fully support at least @@ -189,18 +184,20 @@ Behavioural Differences following its parent SSH account, and try to emulate Ansible's existing timeout semantics. -* Normally with Ansible, diagnostics and use of the :py:mod:`logging` package - output on the target machine are discarded. With Mitogen, all of this is - captured and returned to the host machine, where it can be viewed as desired - with ``-vvv``. - * Local commands are executed in a reuseable Python interpreter created identically to interpreters used on remote hosts. At present only one such - interpreter per ``become_user`` exists, and so only one action may be - executed in each context simultaneously. Ansible usually permits up to - ``ansible.cfg:forks`` simultaneous local actions, which may trigger a - performance regression in some playbooks. This will be fixed in a future - release. + interpreter per ``become_user`` exists, and so only one local action may be + executed simultaneously per local user account. + + Ansible usually permits up to ``ansible.cfg:forks`` simultaneous local + actions. Any long-running local actions that execute for every target will + experience artificial serialization, causing slowdown equivalent to + `task_duration * num_targets`. This will be fixed soon. + +* Asynchronous job IDs exist only for the duration of a run, and cannot be + queried by subsequent ansible-playbook invocations. Since the ability to + query job IDs across runs relied on an implementation detail, it is not + expected this will break any real-world playbooks. How Modules Execute @@ -340,16 +337,18 @@ FreeNode IRC network. Debugging --------- -Mitogen's logs are integrated into Ansible's display framework. Basic high -level debug logs are produced with ``-vvv``, with logging of all IO activity on -the controller machine when ``-vvvv`` or higher is specified. - -Although any use of standard IO and the logging package on remote machines is -forwarded to the controller machine, it is not possible to receive logs of all -IO activity, as the processs of receiving those logs would would in turn -generate more IO activity. To receive a complete trace of every process on -every machine, file-based logging is required. File-based logging can be -enabled by setting ``MITOGEN_ROUTER_DEBUG=1`` in your environment. +Normally with Ansible, diagnostics and use of the :py:mod:`logging` package +output on the target machine are discarded. With Mitogen, all of this is +captured and returned to the host machine, where it can be viewed as desired +with ``-vvv``. Basic high level logs are produced with ``-vvv``, with logging +of all IO on the controller with ``-vvvv`` or higher. + +Although use of standard IO and the logging package on the target is forwarded +to the controller, it is not possible to receive IO activity logs, as the +processs of receiving those logs would would itself generate IO activity. To +receive a complete trace of every process on every machine, file-based logging +is necessary. File-based logging can be enabled by setting +``MITOGEN_ROUTER_DEBUG=1`` in your environment. When file-based logging is enabled, one file per context will be created on the local machine and every target machine, as ``/tmp/mitogen..log``. diff --git a/docs/services.rst b/docs/services.rst index c121734a..0c12cee9 100644 --- a/docs/services.rst +++ b/docs/services.rst @@ -7,8 +7,8 @@ Service Framework Mitogen includes a simple framework for implementing services exposed to other contexts, with built-in subclasses that capture some common service models. -This is a work in progress, and new functionality will be added as a common use -for it is is found. +This is a work in progress, and new functionality will be added as common usage +patterns emerge. Overview @@ -72,6 +72,13 @@ Example Reference --------- +.. autoclass:: mitogen.service.Policy +.. autoclass:: mitogen.service.AllowParents +.. autoclass:: mitogen.service.AllowAny + +.. autofunction:: mitogen.service.arg_spec +.. autofunction:: mitogen.service.expose + .. autofunction:: mitogen.service.Service .. autoclass:: mitogen.service.Service diff --git a/mitogen/service.py b/mitogen/service.py index 018fa17d..abbc6bb1 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -54,6 +54,72 @@ class AllowParents(Policy): msg.auth_id == mitogen.context_id) +def validate_arg_spec(spec, args): + for name in spec: + try: + obj = args[name] + except KeyError: + raise mitogen.core.CallError( + 'Required argument %r missing.' % (name,) + ) + + if not isinstance(obj, spec[name]): + raise mitogen.core.CallError( + 'Argument %r type incorrect, got %r, expected %r' % ( + name, + type(obj), + spec[name] + ) + ) + + +def arg_spec(spec): + """ + Annotate a method as requiring arguments with a specific type. This only + validates required arguments. For optional arguments, write a manual check + within the function. + + :: + + @mitogen.service.arg_spec({ + 'path': str + }) + def fetch_path(self, path, optional=None): + ... + + :param dict spec: + Mapping from argument name to expected type. + """ + def wrapper(func): + func.mitogen_service__arg_spec = spec + return func + return wrapper + + +def expose(policy): + """ + Annotate a method to permit access to contexts matching an authorization + policy. The annotation may be specified multiple times. Methods lacking any + authorization policy are not accessible. + + :: + + @mitogen.service.expose(policy=mitogen.service.AllowParents()) + def unsafe_operation(self): + ... + + :param mitogen.service.Policy policy: + The policy to require. + """ + def wrapper(func): + func.mitogen_service__policies = ( + [policy] + + getattr(func, 'mitogen_service__policies', []) + ) + return func + return wrapper + + class Service(object): #: Sentinel object to suppress reply generation, since returning ``None`` #: will trigger a response message containing the pickled ``None``. @@ -64,17 +130,6 @@ class Service(object): handle = None max_message_size = 0 - #: Mapping from required key names to their required corresponding types, - #: used by the default :py:meth:`validate_args` implementation to validate - #: requests. - required_args = {} - - #: Policies that must authorize each message. By default only parents are - #: authorized. - policies = ( - AllowParents(), - ) - def __init__(self, router): self.router = router self.recv = mitogen.core.Receiver(router, self.handle) @@ -88,58 +143,51 @@ class Service(object): self.__class__.__name__, ) - def validate_args(self, args): - return ( - isinstance(args, dict) and - all(isinstance(args.get(k), t) - for k, t in self.required_args.iteritems()) - ) - def dispatch(self, args, msg): raise NotImplementedError() - def dispatch_one(self, msg): - if not all(p.is_authorized(self, msg) for p in self.policies): - LOG.error('%r: unauthorized message %r', self, msg) - msg.reply(mitogen.core.CallError('Unauthorized')) - return - + def _validate_message(self, msg): if len(msg.data) > self.max_message_size: - LOG.error('%r: larger than permitted size: %r', self, msg) - msg.reply(mitogen.core.CallError('Message size exceeded')) - return - - args = msg.unpickle(throw=False) - if (args == mitogen.core._DEAD or - isinstance(args, mitogen.core.CallError) or - not self.validate_args(args)): - LOG.warning('Received junk message: %r', args) - msg.reply(mitogen.core.CallError('Received junk message')) - return + raise mitogen.core.CallError('Message size exceeded.') + + pair = msg.unpickle(throw=False) + if not (isinstance(pair, tuple) and + len(pair) == 2 and + isinstance(pair[0], basestring)): + raise mitogen.core.CallError('Invalid message format.') + + method_name, kwargs = pair + method = getattr(self, method_name, None) + if method is None: + raise mitogen.core.CallError('No such method exists.') + + policies = getattr(method, 'mitogen_service__policies', None) + if not policies: + raise mitogen.core.CallError('Method has no policies set.') + + if not all(p.is_authorized(self, msg) for p in policies): + raise mitogen.core.CallError('Unauthorized') + required = getattr(method, 'mitogen_service__arg_spec', {}) + validate_arg_spec(required, kwargs) + return method_name, kwargs + + def _on_receive_message(self, msg): + method_name, kwargs = self._validate_message(msg) + return getattr(self, method_name)(**kwargs) + + def on_receive_message(self, msg): try: - response = self.dispatch(args, msg) + response = self._on_receive_message(msg) if response is not self.NO_REPLY: msg.reply(response) + except mitogen.core.CallError, e: + LOG.warning('%r: %s', self, msg) + msg.reply(e) except Exception, e: LOG.exception('While invoking %r.dispatch()', self) msg.reply(mitogen.core.CallError(e)) - def run_once(self): - try: - msg = self.recv.get() - except mitogen.core.ChannelError, e: - # Channel closed due to broker shutdown, exit gracefully. - LOG.debug('%r: channel closed: %s', self, e) - self.running = False - return - - self.dispatch_one(msg) - - def run(self): - while self.running: - self.run_once() - class DeduplicatingService(Service): """ @@ -159,13 +207,13 @@ class DeduplicatingService(Service): self._waiters = {} self._lock = threading.Lock() - def key_from_request(self, args): + def key_from_request(self, method_name, kwargs): """ Generate a deduplication key from the request. The default implementation returns a string based on a stable representation of the input dictionary generated by :py:func:`pprint.pformat`. """ - return pprint.pformat(args) + return pprint.pformat((method_name, kwargs)) def get_response(self, args): raise NotImplementedError() @@ -181,8 +229,9 @@ class DeduplicatingService(Service): finally: self._lock.release() - def dispatch(self, args, msg): - key = self.key_from_request(args) + def _on_receive_message(self, msg): + method_name, kwargs = self._validate_message(msg) + key = self.key_from_request(method_name, kwargs) self._lock.acquire() try: @@ -199,7 +248,10 @@ class DeduplicatingService(Service): # I'm the unlucky thread that must generate the response. try: - self._produce_response(key, self.get_response(args)) + response = getattr(self, method_name)(**kwargs) + self._produce_response(key, response) + except mitogen.core.CallError, e: + self._produce_response(key, e) except Exception, e: self._produce_response(key, mitogen.core.CallError(e)) @@ -260,7 +312,7 @@ class Pool(object): service = msg.receiver.service try: - service.dispatch_one(msg) + service.on_receive_message(msg) except Exception: LOG.exception('While handling %r using %r', msg, service) @@ -281,7 +333,12 @@ class Pool(object): ) -def call(context, handle, obj): - msg = mitogen.core.Message.pickled(obj, handle=handle) - recv = context.send_async(msg) +def call_async(context, handle, method, kwargs): + pair = (method, kwargs) + msg = mitogen.core.Message.pickled(pair, handle=handle) + return context.send_async(msg) + + +def call(context, handle, method, kwargs): + recv = call_async(context, handle, method, kwargs) return recv.get().unpickle() diff --git a/tests/ansible/integration/async_polling.yml b/tests/ansible/integration/async_polling.yml deleted file mode 100644 index b08394a7..00000000 --- a/tests/ansible/integration/async_polling.yml +++ /dev/null @@ -1,14 +0,0 @@ -- hosts: all - any_errors_fatal: true - tasks: - - name: simulate long running op (3 sec), wait for up to 5 sec, poll every 1 sec - command: /bin/sleep 2 - async: 4 - register: derp - - - debug: msg={{derp}} - - - async_status: jid={{derp.ansible_job_id}} - register: derp2 - - - debug: msg={{derp2}} diff --git a/tests/ansible/integration/runner/all.yml b/tests/ansible/integration/runner/all.yml index b2424b6b..9d815518 100644 --- a/tests/ansible/integration/runner/all.yml +++ b/tests/ansible/integration/runner/all.yml @@ -1,3 +1,6 @@ +- import_playbook: async_job_timeout.yml +- import_playbook: async_one_job.yml +- import_playbook: async_two_simultaneous_jobs.yml - import_playbook: builtin_command_module.yml - import_playbook: custom_bash_old_style_module.yml - import_playbook: custom_bash_want_json_module.yml @@ -9,4 +12,5 @@ - import_playbook: custom_python_json_args_module.yml - import_playbook: custom_python_new_style_module.yml - import_playbook: custom_python_want_json_module.yml +- import_playbook: forking_behaviour.yml - import_playbook: remote_tmp.yml diff --git a/tests/ansible/integration/runner/async_job_timeout.yml b/tests/ansible/integration/runner/async_job_timeout.yml new file mode 100644 index 00000000..47857733 --- /dev/null +++ b/tests/ansible/integration/runner/async_job_timeout.yml @@ -0,0 +1,52 @@ +# Verify 'async: ' functions as desired. + + +- hosts: all + any_errors_fatal: true + tasks: + + # Verify async-with-polling-and-timeout behaviour. + + - name: sleep for 7 seconds, but timeout after 1 second. + ignore_errors: true + shell: sleep 7 + async: 1 + poll: 1 + register: job1 + + - assert: + that: + - job1.changed == False + - job1.failed == True + - job1.msg == "async task did not complete within the requested time" + - job1.keys()|sort == ['changed', 'failed', 'msg'] + + # Verify async-with-timeout-then-poll behaviour. + # This is broken in upstream Ansible, so disable the tests there. + # + # TODO: the tests below are totally broken, not clear what Ansible is + # supposed to do here, so can't emulate it in Mitogen. + + - name: sleep for 7 seconds, but timeout after 1 second. + ignore_errors: true + shell: sleep 7 + async: 1 + poll: 0 + register: job2 + when: false # is_mitogen + + - name: poll up to 10 times. + async_status: + jid: "{{job2.ansible_job_id}}" + register: result2 + until: result2.finished + retries: 10 + delay: 1 + when: false # is_mitogen + + - assert: + that: + - result1.rc == 0 + - result2.rc == 0 + - result2.stdout == 'im_alive' + when: false # is_mitogen diff --git a/tests/ansible/integration/runner/async_one_job.yml b/tests/ansible/integration/runner/async_one_job.yml new file mode 100644 index 00000000..b6156cca --- /dev/null +++ b/tests/ansible/integration/runner/async_one_job.yml @@ -0,0 +1,99 @@ +# Verify behaviour of a single asynchronous task, and presence of all output +# fields. + +- hosts: all + any_errors_fatal: true + tasks: + + # Verify async jobs run in a new process. + + - name: get process ID. + custom_python_detect_environment: + register: sync_proc1 + + - name: get process ID again. + custom_python_detect_environment: + register: sync_proc2 + + - assert: + that: + - sync_proc1.pid == sync_proc2.pid + when: is_mitogen + + - name: get async process ID. + custom_python_detect_environment: + register: async_proc1 + async: 1000 + poll: 0 + + - name: busy-poll up to 100000 times + async_status: + jid: "{{async_proc1.ansible_job_id}}" + register: async_result1 + until: async_result1.finished + retries: 100000 + delay: 0 + + - name: get async process ID again. + custom_python_detect_environment: + register: async_proc2 + async: 1000 + poll: 0 + + - name: busy-poll up to 100000 times + async_status: + jid: "{{async_proc2.ansible_job_id}}" + register: async_result2 + until: async_result2.finished + retries: 100000 + delay: 0 + + - assert: + that: + - sync_proc1.pid == sync_proc2.pid + - async_result1.pid != sync_proc1.pid + - async_result1.pid != async_result2.pid + when: is_mitogen + + # Verify output of a single async job. + + - name: start 2 second op + shell: | + sleep 1; + echo alldone + async: 1000 + poll: 0 + register: job1 + + - assert: + that: | + job1.ansible_job_id and + (job1.changed == True) and + (job1.started == 1) and + (job1.changed == True) and + (job1.finished == 0) + + - name: busy-poll up to 100000 times + async_status: + jid: "{{job1.ansible_job_id}}" + register: result1 + until: result1.finished + retries: 100000 + delay: 0 + + - assert: + that: + - result1.ansible_job_id == job1.ansible_job_id + - result1.attempts <= 100000 + - result1.changed == True + - result1.cmd == "sleep 1;\n echo alldone" + - result1.delta|length == 14 + - result1.start|length == 26 + - result1.failed == False + - result1.finished == 1 + - result1.rc == 0 + - result1.start|length == 26 + - result1.stderr == "" + - result1.stderr_lines == [] + - result1.stdout == "alldone" + - result1.stdout_lines == ["alldone"] diff --git a/tests/ansible/integration/runner/async_two_simulataneous_jobs.yml b/tests/ansible/integration/runner/async_two_simulataneous_jobs.yml new file mode 100644 index 00000000..4bf244d7 --- /dev/null +++ b/tests/ansible/integration/runner/async_two_simulataneous_jobs.yml @@ -0,0 +1,54 @@ +- hosts: all + any_errors_fatal: true + tasks: + + # Start 2 duplicate jobs, verify they run concurrently. + + - name: create semaphore file and sleep for 5 seconds. + shell: | + exec 2>/dev/null; + bash -c ' + echo im_alive $$ > /tmp/flurp + sleep 60; + '; + rm -f /tmp/flurp; + echo alldone + async: 1000 + poll: 0 + register: job1 + + # This guy prints the first field from the semaphore file and kills the PID + # from the second field, cancelling the slow sleep above, so the busy-poll + # below compltes quickly. + - name: verify semaphore file exists while this job exists. + shell: | + [ -f /tmp/flurp ] && { + read im_alive pid < /tmp/flurp + echo $im_alive + kill $pid &>/dev/null + } + async: 1000 + poll: 0 + register: job2 + + - name: (job1) busy-poll up to 100000 times + async_status: + jid: "{{job1.ansible_job_id}}" + register: result1 + until: result1.finished + retries: 100000 + delay: 0 + + - name: (job2) busy-poll up to 100000 times + async_status: + jid: "{{job2.ansible_job_id}}" + register: result2 + until: result2.finished + retries: 100000 + delay: 0 + + - assert: + that: + - result1.rc == 0 + - result2.rc == 0 + - result2.stdout == 'im_alive' diff --git a/tests/ansible/integration/runner/forking_behaviour.yml b/tests/ansible/integration/runner/forking_behaviour.yml new file mode 100644 index 00000000..51318ff4 --- /dev/null +++ b/tests/ansible/integration/runner/forking_behaviour.yml @@ -0,0 +1,45 @@ + +- hosts: all + any_errors_fatal: true + tasks: + + # Verify non-async jobs run in-process. + + - debug: msg={{is_mitogen}} + + - name: get process ID. + custom_python_detect_environment: + register: sync_proc1 + when: is_mitogen + + - name: get process ID again. + custom_python_detect_environment: + register: sync_proc2 + when: is_mitogen + + - assert: + that: + - sync_proc1.pid == sync_proc2.pid + when: is_mitogen + + # Verify mitogen_task_isolation=fork triggers forking. + + - name: get force-forked process ID. + custom_python_detect_environment: + register: fork_proc1 + vars: + mitogen_task_isolation: fork + when: is_mitogen + + - name: get force-forked process ID again. + custom_python_detect_environment: + register: fork_proc2 + vars: + mitogen_task_isolation: fork + when: is_mitogen + + - assert: + that: + - fork_proc1.pid != sync_proc1.pid + - fork_proc1.pid != fork_proc2.pid + when: is_mitogen diff --git a/tests/ansible/lib/modules/custom_python_detect_environment.py b/tests/ansible/lib/modules/custom_python_detect_environment.py index 8e29249a..5493fdc1 100644 --- a/tests/ansible/lib/modules/custom_python_detect_environment.py +++ b/tests/ansible/lib/modules/custom_python_detect_environment.py @@ -13,6 +13,10 @@ import sys def main(): module = AnsibleModule(argument_spec={}) module.exit_json( + pid=os.getpid(), + ppid=os.getppid(), + uid=os.getuid(), + euid=os.geteuid(), sys_executable=sys.executable, mitogen_loaded='mitogen.core' in sys.modules, hostname=socket.gethostname(), diff --git a/tests/ansible/run_ansible_playbook.sh b/tests/ansible/run_ansible_playbook.sh index 1df82047..36af1755 100755 --- a/tests/ansible/run_ansible_playbook.sh +++ b/tests/ansible/run_ansible_playbook.sh @@ -4,4 +4,11 @@ # Used by delegate_to.yml to ensure "sudo -E" preserves environment. export I_WAS_PRESERVED=1 -exec ansible-playbook "$@" +if [ "${ANSIBLE_STRATEGY:0:7}" = "mitogen" ] +then + extra="-e is_mitogen=1" +else + extra="-e is_mitogen=0" +fi + +exec ansible-playbook $extra "$@"