diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index a613cde3..8f4cf77b 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -38,7 +38,6 @@ from __future__ import absolute_import import json import logging import os -import random from ansible.executor import module_common import ansible.errors @@ -116,8 +115,6 @@ class Invocation(object): self.env = env #: Boolean, if :py:data:`True`, launch the module asynchronously. self.wrap_async = wrap_async - #: String Job ID. - self.job_id = self._make_job_id() #: Initially ``None``, but set by :func:`invoke`. The path on the #: master to the module's implementation file. @@ -126,9 +123,6 @@ class Invocation(object): #: binary contents of the module. self.module_source = None - def _make_job_id(self): - return '%016x' % random.randint(0, 2**64) - def __repr__(self): return 'Invocation(module_name=%s)' % (self.module_name,) @@ -167,9 +161,10 @@ class Planner(object): # named by `runner_name`. } """ - kwargs.setdefault('job_id', invocation.job_id) + kwargs.setdefault('emulate_tty', True) kwargs.setdefault('service_context', invocation.connection.parent) kwargs.setdefault('should_fork', self.get_should_fork(invocation)) + kwargs.setdefault('wrap_async', invocation.wrap_async) return kwargs @@ -349,7 +344,10 @@ def get_module_data(name): return path, source -def _do_invoke(invocation): +def invoke(invocation): + """ + Find a suitable Planner that knows how to run `invocation`. + """ (invocation.module_path, invocation.module_source) = get_module_data(invocation.module_name) @@ -358,52 +356,12 @@ def _do_invoke(invocation): if planner.detect(invocation): LOG.debug('%r accepted %r (filename %r)', planner, invocation.module_name, invocation.module_path) - break + return invocation.action._postprocess_response( + invocation.connection.call( + ansible_mitogen.target.run_module, + planner.plan(invocation), + ) + ) LOG.debug('%r rejected %r', planner, invocation.module_name) - else: - raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) - - return invocation.connection.call_async( - ansible_mitogen.target.run_module, - planner.plan(invocation), - ) - - -def _invoke_async(invocation): - _do_invoke(invocation) - return { - 'stdout': json.dumps({ - # modules/utilities/logic/async_wrapper.py::_run_module(). - 'changed': True, - 'started': 1, - 'finished': 0, - 'ansible_job_id': invocation.job_id, - }) - } - - -def _invoke_sync(invocation): - result_recv = mitogen.core.Receiver(invocation.connection.router) - mitogen.service.call_async( - context=invocation.connection.parent, - handle=ansible_mitogen.services.JobResultService.handle, - method='listen', - kwargs={ - 'job_id': invocation.job_id, - 'sender': result_recv.to_sender(), - } - ) - _do_invoke(invocation) - return result_recv.get().unpickle() - - -def invoke(invocation): - """ - Find a suitable Planner that knows how to run `invocation`. - """ - if invocation.wrap_async: - js = _invoke_async(invocation) - else: - js = _invoke_sync(invocation) - return invocation.action._postprocess_response(js) + raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) diff --git a/ansible_mitogen/plugins/actions/mitogen_async_status.py b/ansible_mitogen/plugins/actions/mitogen_async_status.py deleted file mode 100644 index 611718b2..00000000 --- a/ansible_mitogen/plugins/actions/mitogen_async_status.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2017, David Wilson -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# 3. Neither the name of the copyright holder nor the names of its contributors -# may be used to endorse or promote products derived from this software without -# specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. - -import ansible.plugins.action -import mitogen.core -import mitogen.utils -import ansible_mitogen.services -import ansible_mitogen.target - - -class ActionModule(ansible.plugins.action.ActionBase): - def _get_async_result(self, job_id): - self._connection._connect() - return mitogen.service.call( - context=self._connection.parent, - handle=ansible_mitogen.services.JobResultService.handle, - method='get', - kwargs={ - 'job_id': job_id, - } - ) - - def _on_result_pending(self, job_id): - return { - '_ansible_parsed': True, - 'ansible_job_id': job_id, - 'started': 1, - 'failed': 0, - 'finished': 0, - 'msg': '', - } - - def _on_result_available(self, job_id, result): - dct = self._parse_returned_data(result) - dct['ansible_job_id'] = job_id - dct['started'] = 1 - dct['finished'] = 1 - - # Cutpasted from the action.py. - if 'stdout' in dct and 'stdout_lines' not in dct: - dct['stdout_lines'] = (dct['stdout'] or u'').splitlines() - if 'stderr' in dct and 'stderr_lines' not in dct: - dct['stderr_lines'] = (dct['stderr'] or u'').splitlines() - return dct - - def run(self, tmp=None, task_vars=None): - job_id = mitogen.utils.cast(self._task.args['jid']) - - result = self._get_async_result(job_id) - if result is None: - return self._on_result_pending(job_id) - else: - return self._on_result_available(job_id, result) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 0cb0b987..8febea90 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -155,7 +155,6 @@ class MuxProcess(object): services=[ ansible_mitogen.services.ContextService(self.router), ansible_mitogen.services.FileService(self.router), - ansible_mitogen.services.JobResultService(self.router), ], size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), ) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 28f8b3cd..16a7d777 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -83,17 +83,17 @@ class Runner(object): Subclasses may override `_run`()` and extend `setup()` and `revert()`. """ - def __init__(self, module, job_id, remote_tmp, service_context, - raw_params=None, args=None, env=None): + def __init__(self, module, remote_tmp, service_context, + emulate_tty=None, raw_params=None, args=None, env=None): if args is None: args = {} if raw_params is not None: args['_raw_params'] = raw_params self.module = module - self.job_id = job_id self.remote_tmp = os.path.expanduser(remote_tmp) self.service_context = service_context + self.emulate_tty = emulate_tty self.raw_params = raw_params self.args = args self.env = env @@ -135,17 +135,6 @@ class Runner(object): """ raise NotImplementedError() - def _send_result(self, dct): - mitogen.service.call( - context=self.service_context, - handle=502, - method='push', - kwargs={ - 'job_id': self.job_id, - 'result': dct - } - ) - def run(self): """ Set up the process environment in preparation for running an Ansible @@ -158,10 +147,7 @@ class Runner(object): """ self.setup() try: - try: - self._send_result(self._run()) - except Exception as e: - self._send_result(mitogen.core.CallError(e)) + return self._run() finally: self.revert() @@ -260,7 +246,7 @@ class ProgramRunner(Runner): try: rc, stdout, stderr = ansible_mitogen.target.exec_args( args=self._get_program_args(), - emulate_tty=True, + emulate_tty=self.emulate_tty, ) except Exception, e: LOG.exception('While running %s', self._get_program_args()) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 61f7477d..6bfede0b 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -246,8 +246,8 @@ class ContextService(mitogen.service.Service): # return value somewhere, but logs will catch a failure anyway. context.call_async(ansible_mitogen.target.start_fork_parent) if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'): - import mitogen.debug - context.call(mitogen.debug.dump_to_logger) + from mitogen import debug + context.call(debug.dump_to_logger) self._key_by_context[context] = key self._refs_by_context[context] = 0 return { @@ -369,93 +369,3 @@ class FileService(mitogen.service.Service): LOG.debug('Serving %r', path) return self._paths[path] - - -class JobResultService(mitogen.service.Service): - """ - Receive the result of a task from a child and forward it to interested - listeners. If no listener exists, store the result until it is requested. - - Storing results in an intermediary service allows: - - * the lifetime of the worker to be decoupled from the lifetime of the job, - * for new and unrelated workers to request the job result after the original - worker that spawned it has exitted, - * for synchronous and asynchronous jobs to be treated identically, - * for latency-free polling and waiting on job results, and - * for Ansible job IDs to be be used to refer to a job in preference to - Mitogen-internal identifiers such as Sender and Context. - - Results are keyed by job ID. - """ - handle = 502 - max_message_size = 1048576 * 64 - - def __init__(self, router): - super(JobResultService, self).__init__(router) - self._lock = threading.Lock() - self._result_by_job_id = {} - self._sender_by_job_id = {} - - @mitogen.service.expose(mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'job_id': str, - 'sender': mitogen.core.Sender, - }) - def listen(self, job_id, sender): - """ - Register to receive the result of a job when it becomes available. - - :param str job_id: - Job ID to listen for. - :param mitogen.core.Sender sender: - Sender on which to deliver the job result. - """ - LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender) - with self._lock: - if job_id in self._sender_by_job_id: - raise Error('Listener already exists for job: %s' % (job_id,)) - self._sender_by_job_id[job_id] = sender - - @mitogen.service.expose(mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'job_id': basestring, - }) - def get(self, job_id): - """ - Return a job's result if it is available, otherwise return immediately. - The job result is forgotten once it has been returned by this method. - - :param str job_id: - Job ID to return. - :returns: - Job result dictionary, or :data:`None`. - """ - LOG.debug('%r.get(job_id=%r)', self, job_id) - with self._lock: - return self._result_by_job_id.pop(job_id, None) - - @mitogen.service.expose(mitogen.service.AllowAny()) - @mitogen.service.arg_spec({ - 'job_id': basestring, - 'result': (mitogen.core.CallError, dict) - }) - def push(self, job_id, result): - """ - Deliver a job's result from a child context, notifying any listener - registred via :meth:`listen` of the result. - - :param str job_id: - Job ID whose result is being pushed. - :param dict result: - Job result dictionary. - """ - LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result) - with self._lock: - if job_id in self._result_by_job_id: - raise Error('Result already exists for job: %s' % (job_id,)) - sender = self._sender_by_job_id.pop(job_id, None) - if sender: - sender.send(result) - else: - self._result_by_job_id[job_id] = result diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index bc329842..430ea74b 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -48,19 +48,9 @@ def wrap_action_loader__get(name, *args, **kwargs): helper methods inherited from ActionBase with implementations that avoid the use of shell fragments wherever possible. - Additionally catch attempts to instantiate the "normal" action with a task - argument whose action is "async_status", and redirect it to a special - implementation that fetches the task result via RPC. - This is used instead of static subclassing as it generalizes to third party action modules outside the Ansible tree. """ - # Necessary since async_status execution strategy is hard-wired in - # executor/task_executor.py::_poll_async_result(). - if ( name == 'normal' and 'task' in kwargs and - kwargs['task'].action == 'async_status'): - name = 'mitogen_async_status' - klass = action_loader__get(name, class_only=True) if klass: wrapped_name = 'MitogenActionModule_' + name @@ -172,7 +162,6 @@ class StrategyMixin(object): """ base_dir = os.path.join(os.path.dirname(__file__), 'plugins') connection_loader.add_directory(os.path.join(base_dir, 'connection')) - action_loader.add_directory(os.path.join(base_dir, 'actions')) def run(self, iterator, play_context, result=0): """ diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index d6a44f4d..ec05ab55 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -42,9 +42,10 @@ import re import stat import subprocess import tempfile -import threading +import traceback import zlib +import ansible.module_utils.json_utils import ansible_mitogen.runner import ansible_mitogen.services import mitogen.core @@ -112,11 +113,26 @@ def start_fork_parent(econtext): @mitogen.core.takes_econtext -def start_fork_child(kwargs, econtext): +def start_fork_child(wrap_async, kwargs, econtext): mitogen.parent.upgrade_router(econtext) context = econtext.router.fork() - kwargs['shutdown_on_exit'] = True - context.call_async(run_module, kwargs) + if not wrap_async: + try: + return context.call(run_module, kwargs) + finally: + context.shutdown() + + job_id = '%016x' % random.randint(0, 2**64) + context.call_async(run_module_async, job_id, kwargs) + return { + 'stdout': json.dumps({ + # modules/utilities/logic/async_wrapper.py::_run_module(). + 'changed': True, + 'started': 1, + 'finished': 0, + 'ansible_job_id': job_id, + }) + } @mitogen.core.takes_econtext @@ -128,16 +144,91 @@ def run_module(kwargs, econtext): from reading sys.stdin. """ should_fork = kwargs.pop('should_fork', False) - shutdown_on_exit = kwargs.pop('shutdown_on_exit', False) + wrap_async = kwargs.pop('wrap_async', False) if should_fork: - _fork_parent.call(start_fork_child, kwargs) - return + return _fork_parent.call(start_fork_child, wrap_async, kwargs) runner_name = kwargs.pop('runner_name') klass = getattr(ansible_mitogen.runner, runner_name) impl = klass(**kwargs) - impl.run() - if shutdown_on_exit: + return impl.run() + + +def _get_async_dir(): + return os.path.expanduser( + os.environ.get('ANSIBLE_ASYNC_DIR', '~/.ansible_async') + ) + + +def _write_job_status(job_id, dct): + """ + Update an async job status file. + """ + LOG.info('_write_job_status(%r, %r)', job_id, dct) + dct.setdefault('ansible_job_id', job_id) + dct.setdefault('data', '') + + async_dir = _get_async_dir() + if not os.path.exists(async_dir): + os.makedirs(async_dir) + + path = os.path.join(async_dir, job_id) + with open(path + '.tmp', 'w') as fp: + fp.write(json.dumps(dct)) + os.rename(path + '.tmp', path) + + +def _run_module_async(job_id, kwargs, econtext): + """ + Body on run_module_async(). + + 1. Immediately updates the status file to mark the job as started. + 2. Installs a timer/signal handler to implement the time limit. + 3. Runs as with run_module(), writing the result to the status file. + """ + _write_job_status(job_id, { + 'started': 1, + 'finished': 0 + }) + + kwargs['emulate_tty'] = False + dct = run_module(kwargs, econtext) + if mitogen.core.PY3: + for key in 'stdout', 'stderr': + dct[key] = dct[key].decode('utf-8', 'surrogateescape') + + try: + filtered, warnings = ( + ansible.module_utils.json_utils. + _filter_non_json_lines(dct['stdout']) + ) + result = json.loads(filtered) + result.setdefault('warnings', []).extend(warnings) + result['stderr'] = dct['stderr'] + _write_job_status(job_id, result) + except Exception: + _write_job_status(job_id, { + "failed": 1, + "msg": traceback.format_exc(), + "data": dct['stdout'], # temporary notice only + "stderr": dct['stderr'] + }) + + +@mitogen.core.takes_econtext +def run_module_async(job_id, kwargs, econtext): + """ + Since run_module_async() is invoked with .call_async(), with nothing to + read the result from the corresponding Receiver, wrap the body in an + exception logger, and wrap that in something that tears down the context on + completion. + """ + try: + try: + _run_module_async(job_id, kwargs, econtext) + except Exception: + LOG.exception('_run_module_async crashed') + finally: econtext.broker.shutdown() diff --git a/tests/ansible/integration/async/all.yml b/tests/ansible/integration/async/all.yml index 0a7364f2..b295b526 100644 --- a/tests/ansible/integration/async/all.yml +++ b/tests/ansible/integration/async/all.yml @@ -1,3 +1,5 @@ -- import_playbook: runner_job_timeout.yml +- import_playbook: result_binary_producing_json.yml +- import_playbook: result_binary_producing_junk.yml +- import_playbook: result_shell_echo_hi.yml - import_playbook: runner_one_job.yml - import_playbook: runner_two_simultaneous_jobs.yml diff --git a/tests/ansible/integration/async/result_binary_producing_json.yml b/tests/ansible/integration/async/result_binary_producing_json.yml new file mode 100644 index 00000000..71bf4351 --- /dev/null +++ b/tests/ansible/integration/async/result_binary_producing_json.yml @@ -0,0 +1,30 @@ + +- name: integration/async/result_binary_producing_json.yml + gather_facts: true + hosts: all + any_errors_fatal: true + tasks: + + - custom_binary_producing_json: + async: 100 + poll: 0 + register: job + + - shell: sleep 1 + + - slurp: + src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" + register: result + + - debug: msg={{async_out}} + vars: + async_out: "{{result.content|b64decode|from_json}}" + + - assert: + that: + - async_out.changed == True + - async_out.failed == False + - async_out.msg == "Hello, world." + - 'async_out.stderr == "binary_producing_json: oh noes\n"' + vars: + async_out: "{{result.content|b64decode|from_json}}" diff --git a/tests/ansible/integration/async/result_binary_producing_junk.yml b/tests/ansible/integration/async/result_binary_producing_junk.yml new file mode 100644 index 00000000..11de5d31 --- /dev/null +++ b/tests/ansible/integration/async/result_binary_producing_junk.yml @@ -0,0 +1,32 @@ + +- name: integration/async/result_binary_producing_junk.yml + gather_facts: true + hosts: all + any_errors_fatal: true + tasks: + + - custom_binary_producing_junk: + async: 100 + poll: 0 + register: job + + - shell: sleep 1 + + - slurp: + src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" + register: result + + - debug: msg={{async_out}} + vars: + async_out: "{{result.content|b64decode|from_json}}" + + - assert: + that: + - async_out.ansible_job_id == job.ansible_job_id + - async_out.data == "Hello, world.\n" + - async_out.failed == 1 + - async_out.msg.startswith("Traceback") + - '"ValueError: No start of json char found\n" in async_out.msg' + - 'async_out.stderr == "binary_producing_junk: oh noes\n"' + vars: + async_out: "{{result.content|b64decode|from_json}}" diff --git a/tests/ansible/integration/async/result_shell_echo_hi.yml b/tests/ansible/integration/async/result_shell_echo_hi.yml new file mode 100644 index 00000000..ce439743 --- /dev/null +++ b/tests/ansible/integration/async/result_shell_echo_hi.yml @@ -0,0 +1,42 @@ + +- name: integration/async/result_shell_echo_hi.yml + gather_facts: true + hosts: all + any_errors_fatal: true + tasks: + + - shell: echo hi + async: 100 + poll: 0 + register: job + + - shell: sleep 1 + + - slurp: + src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" + register: result + + - debug: msg={{async_out}} + vars: + async_out: "{{result.content|b64decode|from_json}}" + + - assert: + that: + - async_out.changed == True + - async_out.cmd == "echo hi" + - 'async_out.delta.startswith("0:00:00")' + - async_out.end.startswith("20") + - async_out.invocation.module_args._raw_params == "echo hi" + - async_out.invocation.module_args._uses_shell == True + - async_out.invocation.module_args.chdir == None + - async_out.invocation.module_args.creates == None + - async_out.invocation.module_args.executable == None + - async_out.invocation.module_args.removes == None + - async_out.invocation.module_args.stdin == None + - async_out.invocation.module_args.warn == True + - async_out.rc == 0 + - async_out.start.startswith("20") + - async_out.stderr == "" + - async_out.stdout == "hi" + vars: + async_out: "{{result.content|b64decode|from_json}}" diff --git a/tests/ansible/integration/runner/all.yml b/tests/ansible/integration/runner/all.yml index 9d815518..b65b1a53 100644 --- a/tests/ansible/integration/runner/all.yml +++ b/tests/ansible/integration/runner/all.yml @@ -1,6 +1,3 @@ -- import_playbook: async_job_timeout.yml -- import_playbook: async_one_job.yml -- import_playbook: async_two_simultaneous_jobs.yml - import_playbook: builtin_command_module.yml - import_playbook: custom_bash_old_style_module.yml - import_playbook: custom_bash_want_json_module.yml