From f06ae05734808a93ceac8e5397dc362f1b204f19 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Wed, 18 Apr 2018 23:14:23 +0100 Subject: [PATCH 1/6] issue #195: add extra logging around FileService and get_file(). --- ansible_mitogen/target.py | 2 ++ mitogen/service.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index 20086cec..d6a44f4d 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -78,6 +78,7 @@ def get_file(context, path): Bytestring file data. """ if path not in _file_cache: + LOG.debug('target.get_file(): fetching %r from %r', path, context) _file_cache[path] = zlib.decompress( mitogen.service.call( context=context, @@ -88,6 +89,7 @@ def get_file(context, path): } ) ) + LOG.debug('target.get_file(): fetched %r from %r', path, context) return _file_cache[path] diff --git a/mitogen/service.py b/mitogen/service.py index 23848b24..9c33202c 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -341,6 +341,8 @@ class Pool(object): def call_async(context, handle, method, kwargs): + LOG.debug('service.call_async(%r, %r, %r, %r)', + context, handle, method, kwargs) pair = (method, kwargs) msg = mitogen.core.Message.pickled(pair, handle=handle) return context.send_async(msg) From 810f557514c60658f845dde716585eaad3e2bb4e Mon Sep 17 00:00:00 2001 From: David Wilson Date: Wed, 18 Apr 2018 23:37:42 +0100 Subject: [PATCH 2/6] issue #195: MITOGEN_DUMP_THREAD_STACKS=1 --- ansible_mitogen/services.py | 3 +++ mitogen/core.py | 1 + mitogen/debug.py | 33 +++++++++++++++++++-------------- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 6e7d4b5a..61f7477d 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -245,6 +245,9 @@ class ContextService(mitogen.service.Service): # We don't need to wait for the result of this. Ideally we'd check its # return value somewhere, but logs will catch a failure anyway. context.call_async(ansible_mitogen.target.start_fork_parent) + if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'): + import mitogen.debug + context.call(mitogen.debug.dump_to_logger) self._key_by_context[context] = key self._refs_by_context[context] = 0 return { diff --git a/mitogen/core.py b/mitogen/core.py index 6bda7f84..20f52554 100644 --- a/mitogen/core.py +++ b/mitogen/core.py @@ -507,6 +507,7 @@ class Importer(object): self._context = context self._present = {'mitogen': [ 'compat', + 'debug', 'fakessh', 'fork', 'master', diff --git a/mitogen/debug.py b/mitogen/debug.py index 8cb1a367..eab1bef8 100644 --- a/mitogen/debug.py +++ b/mitogen/debug.py @@ -31,6 +31,7 @@ Basic signal handler for dumping thread stacks. """ import difflib +import logging import os import signal import sys @@ -39,6 +40,7 @@ import time import traceback +LOG = logging.getLogger(__name__) _last = None @@ -74,15 +76,13 @@ def format_stacks(): return '\n'.join(l) -def _handler(*_): +def get_snapshot(): global _last s = format_stacks() - fp = open('/dev/tty', 'w', 1) - fp.write(s) - + snap = s if _last: - fp.write('\n') + snap += '\n' diff = list(difflib.unified_diff( a=_last.splitlines(), b=s.splitlines(), @@ -91,25 +91,30 @@ def _handler(*_): )) if diff: - fp.write('\n'.join(diff) + '\n') + snap += '\n'.join(diff) + '\n' else: - fp.write('(no change since last time)\n') + snap += '(no change since last time)\n' _last = s + return snap + + +def _handler(*_): + fp = open('/dev/tty', 'w', 1) + fp.write(get_snapshot()) + fp.close() def install_handler(): signal.signal(signal.SIGUSR2, _handler) -def _thread_main(): +def _logging_main(): while True: - time.sleep(7) - l = format_stacks() - open('/tmp/stack.%s.log' % (os.getpid(),), 'wb', 65535).write(l) - break + time.sleep(5) + LOG.info('PERIODIC THREAD DUMP\n\n%s', get_snapshot()) -def dump_periodically(): - th = threading.Thread(target=main) +def dump_to_logger(): + th = threading.Thread(target=_logging_main) th.setDaemon(True) th.start() From 65fb6ff9feec28f49b0f7276b66702bf7553bd24 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Wed, 18 Apr 2018 23:39:49 +0100 Subject: [PATCH 3/6] issue #195: comment out stack pruning code --- mitogen/debug.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mitogen/debug.py b/mitogen/debug.py index eab1bef8..e1122192 100644 --- a/mitogen/debug.py +++ b/mitogen/debug.py @@ -58,7 +58,7 @@ def format_stacks(): threadId, stack, )] - stack = stack.f_back.f_back + #stack = stack.f_back.f_back for filename, lineno, name, line in traceback.extract_stack(stack): l += [ From 296683b130e3eb1fa0300dd9b2ca8f29425c6318 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 19 Apr 2018 22:10:52 +0100 Subject: [PATCH 4/6] ansible: always display Mitogen errors and warnings. They're no longer buried in -vvv output. --- ansible_mitogen/logging.py | 20 +++++++++++++------- mitogen/service.py | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/ansible_mitogen/logging.py b/ansible_mitogen/logging.py index ce4ea127..76b42c7b 100644 --- a/ansible_mitogen/logging.py +++ b/ansible_mitogen/logging.py @@ -39,14 +39,20 @@ class Handler(logging.Handler): """ Use Mitogen's log format, but send the result to a Display method. """ - def __init__(self, method): + def __init__(self, display, normal_method): super(Handler, self).__init__() self.formatter = mitogen.utils.log_get_formatter(usec=True) - self.method = method + self.display = display + self.normal_method = normal_method def emit(self, record): - msg = self.format(record) - self.method('[pid %d] %s' % (os.getpid(), msg)) + s = '[pid %d] %s' % (os.getpid(), self.format(record)) + if record.levelno >= logging.ERROR: + self.display.error(s, wrap_text=False) + elif record.levelno >= logging.WARNING: + self.display.warning(s, formatted=True) + else: + self.normal_method(s) def find_display(): @@ -69,9 +75,9 @@ def setup(): """ display = find_display() - logging.getLogger('ansible_mitogen').handlers = [Handler(display.vvv)] - mitogen.core.LOG.handlers = [Handler(display.vvv)] - mitogen.core.IOLOG.handlers = [Handler(display.vvvv)] + logging.getLogger('ansible_mitogen').handlers = [Handler(display, display.vvv)] + mitogen.core.LOG.handlers = [Handler(display, display.vvv)] + mitogen.core.IOLOG.handlers = [Handler(display, display.vvvv)] mitogen.core.IOLOG.propagate = False if display.verbosity > 2: diff --git a/mitogen/service.py b/mitogen/service.py index 9c33202c..78b1213c 100644 --- a/mitogen/service.py +++ b/mitogen/service.py @@ -314,7 +314,7 @@ class Pool(object): msg = self._select.get() except (mitogen.core.ChannelError, mitogen.core.LatchError): e = sys.exc_info()[1] - LOG.error('%r: channel or latch closed, exitting: %s', self, e) + LOG.info('%r: channel or latch closed, exitting: %s', self, e) return service = msg.receiver.service From 6ad18b67197e6b490e8896146fed53c3f387f255 Mon Sep 17 00:00:00 2001 From: David Wilson Date: Thu, 19 Apr 2018 22:12:18 +0100 Subject: [PATCH 5/6] issue #191: move async tests to their own category --- tests/ansible/integration/all.yml | 1 + tests/ansible/integration/async/all.yml | 3 +++ .../async_job_timeout.yml => async/runner_job_timeout.yml} | 2 +- .../{runner/async_one_job.yml => async/runner_one_job.yml} | 2 +- .../runner_two_simultaneous_jobs.yml} | 2 +- 5 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 tests/ansible/integration/async/all.yml rename tests/ansible/integration/{runner/async_job_timeout.yml => async/runner_job_timeout.yml} (96%) rename tests/ansible/integration/{runner/async_one_job.yml => async/runner_one_job.yml} (98%) rename tests/ansible/integration/{runner/async_two_simultaneous_jobs.yml => async/runner_two_simultaneous_jobs.yml} (95%) diff --git a/tests/ansible/integration/all.yml b/tests/ansible/integration/all.yml index 4dd2c596..15471c75 100644 --- a/tests/ansible/integration/all.yml +++ b/tests/ansible/integration/all.yml @@ -4,6 +4,7 @@ # - import_playbook: action/all.yml +- import_playbook: async/all.yml - import_playbook: become/all.yml - import_playbook: connection_loader/all.yml - import_playbook: context_service/all.yml diff --git a/tests/ansible/integration/async/all.yml b/tests/ansible/integration/async/all.yml new file mode 100644 index 00000000..0a7364f2 --- /dev/null +++ b/tests/ansible/integration/async/all.yml @@ -0,0 +1,3 @@ +- import_playbook: runner_job_timeout.yml +- import_playbook: runner_one_job.yml +- import_playbook: runner_two_simultaneous_jobs.yml diff --git a/tests/ansible/integration/runner/async_job_timeout.yml b/tests/ansible/integration/async/runner_job_timeout.yml similarity index 96% rename from tests/ansible/integration/runner/async_job_timeout.yml rename to tests/ansible/integration/async/runner_job_timeout.yml index 868b676d..b15fc2dd 100644 --- a/tests/ansible/integration/runner/async_job_timeout.yml +++ b/tests/ansible/integration/async/runner_job_timeout.yml @@ -1,6 +1,6 @@ # Verify 'async: ' functions as desired. -- name: integration/runner/async_job_timeout.yml +- name: integration/async/runner_job_timeout.yml hosts: all any_errors_fatal: true tasks: diff --git a/tests/ansible/integration/runner/async_one_job.yml b/tests/ansible/integration/async/runner_one_job.yml similarity index 98% rename from tests/ansible/integration/runner/async_one_job.yml rename to tests/ansible/integration/async/runner_one_job.yml index 6933f82b..846b173c 100644 --- a/tests/ansible/integration/runner/async_one_job.yml +++ b/tests/ansible/integration/async/runner_one_job.yml @@ -1,7 +1,7 @@ # Verify behaviour of a single asynchronous task, and presence of all output # fields. -- name: integration/runner/async_one_job.yml +- name: integration/async/runner_one_job.yml hosts: all any_errors_fatal: true tasks: diff --git a/tests/ansible/integration/runner/async_two_simultaneous_jobs.yml b/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml similarity index 95% rename from tests/ansible/integration/runner/async_two_simultaneous_jobs.yml rename to tests/ansible/integration/async/runner_two_simultaneous_jobs.yml index 679608c5..5c20fc5d 100644 --- a/tests/ansible/integration/runner/async_two_simultaneous_jobs.yml +++ b/tests/ansible/integration/async/runner_two_simultaneous_jobs.yml @@ -1,5 +1,5 @@ -- name: integration/runner/async_two_simultaneous_jobs.yml +- name: integration/async/runner_two_simultaneous_jobs.yml hosts: all any_errors_fatal: true tasks: From 85e1f5f515db6286341a0c3a166d4749baafcf2d Mon Sep 17 00:00:00 2001 From: David Wilson Date: Fri, 20 Apr 2018 14:20:05 +0100 Subject: [PATCH 6/6] ansible: remove JobResultService, more compatible async jobs; closes #191. And by "compatible" I mean "terrible". This does not implement async job timeouts, but I'm not going to bother, upstream async implementation is so buggy and inconsistent it resists even having its behaviour captured in tests. --- ansible_mitogen/planner.py | 68 +++-------- .../plugins/actions/mitogen_async_status.py | 78 ------------- ansible_mitogen/process.py | 1 - ansible_mitogen/runner.py | 24 +--- ansible_mitogen/services.py | 94 +-------------- ansible_mitogen/strategy.py | 11 -- ansible_mitogen/target.py | 109 ++++++++++++++++-- tests/ansible/integration/async/all.yml | 4 +- .../async/result_binary_producing_json.yml | 30 +++++ .../async/result_binary_producing_junk.yml | 32 +++++ .../async/result_shell_echo_hi.yml | 42 +++++++ tests/ansible/integration/runner/all.yml | 3 - 12 files changed, 227 insertions(+), 269 deletions(-) delete mode 100644 ansible_mitogen/plugins/actions/mitogen_async_status.py create mode 100644 tests/ansible/integration/async/result_binary_producing_json.yml create mode 100644 tests/ansible/integration/async/result_binary_producing_junk.yml create mode 100644 tests/ansible/integration/async/result_shell_echo_hi.yml diff --git a/ansible_mitogen/planner.py b/ansible_mitogen/planner.py index a613cde3..8f4cf77b 100644 --- a/ansible_mitogen/planner.py +++ b/ansible_mitogen/planner.py @@ -38,7 +38,6 @@ from __future__ import absolute_import import json import logging import os -import random from ansible.executor import module_common import ansible.errors @@ -116,8 +115,6 @@ class Invocation(object): self.env = env #: Boolean, if :py:data:`True`, launch the module asynchronously. self.wrap_async = wrap_async - #: String Job ID. - self.job_id = self._make_job_id() #: Initially ``None``, but set by :func:`invoke`. The path on the #: master to the module's implementation file. @@ -126,9 +123,6 @@ class Invocation(object): #: binary contents of the module. self.module_source = None - def _make_job_id(self): - return '%016x' % random.randint(0, 2**64) - def __repr__(self): return 'Invocation(module_name=%s)' % (self.module_name,) @@ -167,9 +161,10 @@ class Planner(object): # named by `runner_name`. } """ - kwargs.setdefault('job_id', invocation.job_id) + kwargs.setdefault('emulate_tty', True) kwargs.setdefault('service_context', invocation.connection.parent) kwargs.setdefault('should_fork', self.get_should_fork(invocation)) + kwargs.setdefault('wrap_async', invocation.wrap_async) return kwargs @@ -349,7 +344,10 @@ def get_module_data(name): return path, source -def _do_invoke(invocation): +def invoke(invocation): + """ + Find a suitable Planner that knows how to run `invocation`. + """ (invocation.module_path, invocation.module_source) = get_module_data(invocation.module_name) @@ -358,52 +356,12 @@ def _do_invoke(invocation): if planner.detect(invocation): LOG.debug('%r accepted %r (filename %r)', planner, invocation.module_name, invocation.module_path) - break + return invocation.action._postprocess_response( + invocation.connection.call( + ansible_mitogen.target.run_module, + planner.plan(invocation), + ) + ) LOG.debug('%r rejected %r', planner, invocation.module_name) - else: - raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) - - return invocation.connection.call_async( - ansible_mitogen.target.run_module, - planner.plan(invocation), - ) - - -def _invoke_async(invocation): - _do_invoke(invocation) - return { - 'stdout': json.dumps({ - # modules/utilities/logic/async_wrapper.py::_run_module(). - 'changed': True, - 'started': 1, - 'finished': 0, - 'ansible_job_id': invocation.job_id, - }) - } - - -def _invoke_sync(invocation): - result_recv = mitogen.core.Receiver(invocation.connection.router) - mitogen.service.call_async( - context=invocation.connection.parent, - handle=ansible_mitogen.services.JobResultService.handle, - method='listen', - kwargs={ - 'job_id': invocation.job_id, - 'sender': result_recv.to_sender(), - } - ) - _do_invoke(invocation) - return result_recv.get().unpickle() - - -def invoke(invocation): - """ - Find a suitable Planner that knows how to run `invocation`. - """ - if invocation.wrap_async: - js = _invoke_async(invocation) - else: - js = _invoke_sync(invocation) - return invocation.action._postprocess_response(js) + raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation)) diff --git a/ansible_mitogen/plugins/actions/mitogen_async_status.py b/ansible_mitogen/plugins/actions/mitogen_async_status.py deleted file mode 100644 index 611718b2..00000000 --- a/ansible_mitogen/plugins/actions/mitogen_async_status.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2017, David Wilson -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# 3. Neither the name of the copyright holder nor the names of its contributors -# may be used to endorse or promote products derived from this software without -# specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. - -import ansible.plugins.action -import mitogen.core -import mitogen.utils -import ansible_mitogen.services -import ansible_mitogen.target - - -class ActionModule(ansible.plugins.action.ActionBase): - def _get_async_result(self, job_id): - self._connection._connect() - return mitogen.service.call( - context=self._connection.parent, - handle=ansible_mitogen.services.JobResultService.handle, - method='get', - kwargs={ - 'job_id': job_id, - } - ) - - def _on_result_pending(self, job_id): - return { - '_ansible_parsed': True, - 'ansible_job_id': job_id, - 'started': 1, - 'failed': 0, - 'finished': 0, - 'msg': '', - } - - def _on_result_available(self, job_id, result): - dct = self._parse_returned_data(result) - dct['ansible_job_id'] = job_id - dct['started'] = 1 - dct['finished'] = 1 - - # Cutpasted from the action.py. - if 'stdout' in dct and 'stdout_lines' not in dct: - dct['stdout_lines'] = (dct['stdout'] or u'').splitlines() - if 'stderr' in dct and 'stderr_lines' not in dct: - dct['stderr_lines'] = (dct['stderr'] or u'').splitlines() - return dct - - def run(self, tmp=None, task_vars=None): - job_id = mitogen.utils.cast(self._task.args['jid']) - - result = self._get_async_result(job_id) - if result is None: - return self._on_result_pending(job_id) - else: - return self._on_result_available(job_id, result) diff --git a/ansible_mitogen/process.py b/ansible_mitogen/process.py index 0cb0b987..8febea90 100644 --- a/ansible_mitogen/process.py +++ b/ansible_mitogen/process.py @@ -155,7 +155,6 @@ class MuxProcess(object): services=[ ansible_mitogen.services.ContextService(self.router), ansible_mitogen.services.FileService(self.router), - ansible_mitogen.services.JobResultService(self.router), ], size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), ) diff --git a/ansible_mitogen/runner.py b/ansible_mitogen/runner.py index 28f8b3cd..16a7d777 100644 --- a/ansible_mitogen/runner.py +++ b/ansible_mitogen/runner.py @@ -83,17 +83,17 @@ class Runner(object): Subclasses may override `_run`()` and extend `setup()` and `revert()`. """ - def __init__(self, module, job_id, remote_tmp, service_context, - raw_params=None, args=None, env=None): + def __init__(self, module, remote_tmp, service_context, + emulate_tty=None, raw_params=None, args=None, env=None): if args is None: args = {} if raw_params is not None: args['_raw_params'] = raw_params self.module = module - self.job_id = job_id self.remote_tmp = os.path.expanduser(remote_tmp) self.service_context = service_context + self.emulate_tty = emulate_tty self.raw_params = raw_params self.args = args self.env = env @@ -135,17 +135,6 @@ class Runner(object): """ raise NotImplementedError() - def _send_result(self, dct): - mitogen.service.call( - context=self.service_context, - handle=502, - method='push', - kwargs={ - 'job_id': self.job_id, - 'result': dct - } - ) - def run(self): """ Set up the process environment in preparation for running an Ansible @@ -158,10 +147,7 @@ class Runner(object): """ self.setup() try: - try: - self._send_result(self._run()) - except Exception as e: - self._send_result(mitogen.core.CallError(e)) + return self._run() finally: self.revert() @@ -260,7 +246,7 @@ class ProgramRunner(Runner): try: rc, stdout, stderr = ansible_mitogen.target.exec_args( args=self._get_program_args(), - emulate_tty=True, + emulate_tty=self.emulate_tty, ) except Exception, e: LOG.exception('While running %s', self._get_program_args()) diff --git a/ansible_mitogen/services.py b/ansible_mitogen/services.py index 61f7477d..6bfede0b 100644 --- a/ansible_mitogen/services.py +++ b/ansible_mitogen/services.py @@ -246,8 +246,8 @@ class ContextService(mitogen.service.Service): # return value somewhere, but logs will catch a failure anyway. context.call_async(ansible_mitogen.target.start_fork_parent) if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'): - import mitogen.debug - context.call(mitogen.debug.dump_to_logger) + from mitogen import debug + context.call(debug.dump_to_logger) self._key_by_context[context] = key self._refs_by_context[context] = 0 return { @@ -369,93 +369,3 @@ class FileService(mitogen.service.Service): LOG.debug('Serving %r', path) return self._paths[path] - - -class JobResultService(mitogen.service.Service): - """ - Receive the result of a task from a child and forward it to interested - listeners. If no listener exists, store the result until it is requested. - - Storing results in an intermediary service allows: - - * the lifetime of the worker to be decoupled from the lifetime of the job, - * for new and unrelated workers to request the job result after the original - worker that spawned it has exitted, - * for synchronous and asynchronous jobs to be treated identically, - * for latency-free polling and waiting on job results, and - * for Ansible job IDs to be be used to refer to a job in preference to - Mitogen-internal identifiers such as Sender and Context. - - Results are keyed by job ID. - """ - handle = 502 - max_message_size = 1048576 * 64 - - def __init__(self, router): - super(JobResultService, self).__init__(router) - self._lock = threading.Lock() - self._result_by_job_id = {} - self._sender_by_job_id = {} - - @mitogen.service.expose(mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'job_id': str, - 'sender': mitogen.core.Sender, - }) - def listen(self, job_id, sender): - """ - Register to receive the result of a job when it becomes available. - - :param str job_id: - Job ID to listen for. - :param mitogen.core.Sender sender: - Sender on which to deliver the job result. - """ - LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender) - with self._lock: - if job_id in self._sender_by_job_id: - raise Error('Listener already exists for job: %s' % (job_id,)) - self._sender_by_job_id[job_id] = sender - - @mitogen.service.expose(mitogen.service.AllowParents()) - @mitogen.service.arg_spec({ - 'job_id': basestring, - }) - def get(self, job_id): - """ - Return a job's result if it is available, otherwise return immediately. - The job result is forgotten once it has been returned by this method. - - :param str job_id: - Job ID to return. - :returns: - Job result dictionary, or :data:`None`. - """ - LOG.debug('%r.get(job_id=%r)', self, job_id) - with self._lock: - return self._result_by_job_id.pop(job_id, None) - - @mitogen.service.expose(mitogen.service.AllowAny()) - @mitogen.service.arg_spec({ - 'job_id': basestring, - 'result': (mitogen.core.CallError, dict) - }) - def push(self, job_id, result): - """ - Deliver a job's result from a child context, notifying any listener - registred via :meth:`listen` of the result. - - :param str job_id: - Job ID whose result is being pushed. - :param dict result: - Job result dictionary. - """ - LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result) - with self._lock: - if job_id in self._result_by_job_id: - raise Error('Result already exists for job: %s' % (job_id,)) - sender = self._sender_by_job_id.pop(job_id, None) - if sender: - sender.send(result) - else: - self._result_by_job_id[job_id] = result diff --git a/ansible_mitogen/strategy.py b/ansible_mitogen/strategy.py index bc329842..430ea74b 100644 --- a/ansible_mitogen/strategy.py +++ b/ansible_mitogen/strategy.py @@ -48,19 +48,9 @@ def wrap_action_loader__get(name, *args, **kwargs): helper methods inherited from ActionBase with implementations that avoid the use of shell fragments wherever possible. - Additionally catch attempts to instantiate the "normal" action with a task - argument whose action is "async_status", and redirect it to a special - implementation that fetches the task result via RPC. - This is used instead of static subclassing as it generalizes to third party action modules outside the Ansible tree. """ - # Necessary since async_status execution strategy is hard-wired in - # executor/task_executor.py::_poll_async_result(). - if ( name == 'normal' and 'task' in kwargs and - kwargs['task'].action == 'async_status'): - name = 'mitogen_async_status' - klass = action_loader__get(name, class_only=True) if klass: wrapped_name = 'MitogenActionModule_' + name @@ -172,7 +162,6 @@ class StrategyMixin(object): """ base_dir = os.path.join(os.path.dirname(__file__), 'plugins') connection_loader.add_directory(os.path.join(base_dir, 'connection')) - action_loader.add_directory(os.path.join(base_dir, 'actions')) def run(self, iterator, play_context, result=0): """ diff --git a/ansible_mitogen/target.py b/ansible_mitogen/target.py index d6a44f4d..ec05ab55 100644 --- a/ansible_mitogen/target.py +++ b/ansible_mitogen/target.py @@ -42,9 +42,10 @@ import re import stat import subprocess import tempfile -import threading +import traceback import zlib +import ansible.module_utils.json_utils import ansible_mitogen.runner import ansible_mitogen.services import mitogen.core @@ -112,11 +113,26 @@ def start_fork_parent(econtext): @mitogen.core.takes_econtext -def start_fork_child(kwargs, econtext): +def start_fork_child(wrap_async, kwargs, econtext): mitogen.parent.upgrade_router(econtext) context = econtext.router.fork() - kwargs['shutdown_on_exit'] = True - context.call_async(run_module, kwargs) + if not wrap_async: + try: + return context.call(run_module, kwargs) + finally: + context.shutdown() + + job_id = '%016x' % random.randint(0, 2**64) + context.call_async(run_module_async, job_id, kwargs) + return { + 'stdout': json.dumps({ + # modules/utilities/logic/async_wrapper.py::_run_module(). + 'changed': True, + 'started': 1, + 'finished': 0, + 'ansible_job_id': job_id, + }) + } @mitogen.core.takes_econtext @@ -128,16 +144,91 @@ def run_module(kwargs, econtext): from reading sys.stdin. """ should_fork = kwargs.pop('should_fork', False) - shutdown_on_exit = kwargs.pop('shutdown_on_exit', False) + wrap_async = kwargs.pop('wrap_async', False) if should_fork: - _fork_parent.call(start_fork_child, kwargs) - return + return _fork_parent.call(start_fork_child, wrap_async, kwargs) runner_name = kwargs.pop('runner_name') klass = getattr(ansible_mitogen.runner, runner_name) impl = klass(**kwargs) - impl.run() - if shutdown_on_exit: + return impl.run() + + +def _get_async_dir(): + return os.path.expanduser( + os.environ.get('ANSIBLE_ASYNC_DIR', '~/.ansible_async') + ) + + +def _write_job_status(job_id, dct): + """ + Update an async job status file. + """ + LOG.info('_write_job_status(%r, %r)', job_id, dct) + dct.setdefault('ansible_job_id', job_id) + dct.setdefault('data', '') + + async_dir = _get_async_dir() + if not os.path.exists(async_dir): + os.makedirs(async_dir) + + path = os.path.join(async_dir, job_id) + with open(path + '.tmp', 'w') as fp: + fp.write(json.dumps(dct)) + os.rename(path + '.tmp', path) + + +def _run_module_async(job_id, kwargs, econtext): + """ + Body on run_module_async(). + + 1. Immediately updates the status file to mark the job as started. + 2. Installs a timer/signal handler to implement the time limit. + 3. Runs as with run_module(), writing the result to the status file. + """ + _write_job_status(job_id, { + 'started': 1, + 'finished': 0 + }) + + kwargs['emulate_tty'] = False + dct = run_module(kwargs, econtext) + if mitogen.core.PY3: + for key in 'stdout', 'stderr': + dct[key] = dct[key].decode('utf-8', 'surrogateescape') + + try: + filtered, warnings = ( + ansible.module_utils.json_utils. + _filter_non_json_lines(dct['stdout']) + ) + result = json.loads(filtered) + result.setdefault('warnings', []).extend(warnings) + result['stderr'] = dct['stderr'] + _write_job_status(job_id, result) + except Exception: + _write_job_status(job_id, { + "failed": 1, + "msg": traceback.format_exc(), + "data": dct['stdout'], # temporary notice only + "stderr": dct['stderr'] + }) + + +@mitogen.core.takes_econtext +def run_module_async(job_id, kwargs, econtext): + """ + Since run_module_async() is invoked with .call_async(), with nothing to + read the result from the corresponding Receiver, wrap the body in an + exception logger, and wrap that in something that tears down the context on + completion. + """ + try: + try: + _run_module_async(job_id, kwargs, econtext) + except Exception: + LOG.exception('_run_module_async crashed') + finally: econtext.broker.shutdown() diff --git a/tests/ansible/integration/async/all.yml b/tests/ansible/integration/async/all.yml index 0a7364f2..b295b526 100644 --- a/tests/ansible/integration/async/all.yml +++ b/tests/ansible/integration/async/all.yml @@ -1,3 +1,5 @@ -- import_playbook: runner_job_timeout.yml +- import_playbook: result_binary_producing_json.yml +- import_playbook: result_binary_producing_junk.yml +- import_playbook: result_shell_echo_hi.yml - import_playbook: runner_one_job.yml - import_playbook: runner_two_simultaneous_jobs.yml diff --git a/tests/ansible/integration/async/result_binary_producing_json.yml b/tests/ansible/integration/async/result_binary_producing_json.yml new file mode 100644 index 00000000..71bf4351 --- /dev/null +++ b/tests/ansible/integration/async/result_binary_producing_json.yml @@ -0,0 +1,30 @@ + +- name: integration/async/result_binary_producing_json.yml + gather_facts: true + hosts: all + any_errors_fatal: true + tasks: + + - custom_binary_producing_json: + async: 100 + poll: 0 + register: job + + - shell: sleep 1 + + - slurp: + src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" + register: result + + - debug: msg={{async_out}} + vars: + async_out: "{{result.content|b64decode|from_json}}" + + - assert: + that: + - async_out.changed == True + - async_out.failed == False + - async_out.msg == "Hello, world." + - 'async_out.stderr == "binary_producing_json: oh noes\n"' + vars: + async_out: "{{result.content|b64decode|from_json}}" diff --git a/tests/ansible/integration/async/result_binary_producing_junk.yml b/tests/ansible/integration/async/result_binary_producing_junk.yml new file mode 100644 index 00000000..11de5d31 --- /dev/null +++ b/tests/ansible/integration/async/result_binary_producing_junk.yml @@ -0,0 +1,32 @@ + +- name: integration/async/result_binary_producing_junk.yml + gather_facts: true + hosts: all + any_errors_fatal: true + tasks: + + - custom_binary_producing_junk: + async: 100 + poll: 0 + register: job + + - shell: sleep 1 + + - slurp: + src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" + register: result + + - debug: msg={{async_out}} + vars: + async_out: "{{result.content|b64decode|from_json}}" + + - assert: + that: + - async_out.ansible_job_id == job.ansible_job_id + - async_out.data == "Hello, world.\n" + - async_out.failed == 1 + - async_out.msg.startswith("Traceback") + - '"ValueError: No start of json char found\n" in async_out.msg' + - 'async_out.stderr == "binary_producing_junk: oh noes\n"' + vars: + async_out: "{{result.content|b64decode|from_json}}" diff --git a/tests/ansible/integration/async/result_shell_echo_hi.yml b/tests/ansible/integration/async/result_shell_echo_hi.yml new file mode 100644 index 00000000..ce439743 --- /dev/null +++ b/tests/ansible/integration/async/result_shell_echo_hi.yml @@ -0,0 +1,42 @@ + +- name: integration/async/result_shell_echo_hi.yml + gather_facts: true + hosts: all + any_errors_fatal: true + tasks: + + - shell: echo hi + async: 100 + poll: 0 + register: job + + - shell: sleep 1 + + - slurp: + src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}" + register: result + + - debug: msg={{async_out}} + vars: + async_out: "{{result.content|b64decode|from_json}}" + + - assert: + that: + - async_out.changed == True + - async_out.cmd == "echo hi" + - 'async_out.delta.startswith("0:00:00")' + - async_out.end.startswith("20") + - async_out.invocation.module_args._raw_params == "echo hi" + - async_out.invocation.module_args._uses_shell == True + - async_out.invocation.module_args.chdir == None + - async_out.invocation.module_args.creates == None + - async_out.invocation.module_args.executable == None + - async_out.invocation.module_args.removes == None + - async_out.invocation.module_args.stdin == None + - async_out.invocation.module_args.warn == True + - async_out.rc == 0 + - async_out.start.startswith("20") + - async_out.stderr == "" + - async_out.stdout == "hi" + vars: + async_out: "{{result.content|b64decode|from_json}}" diff --git a/tests/ansible/integration/runner/all.yml b/tests/ansible/integration/runner/all.yml index 9d815518..b65b1a53 100644 --- a/tests/ansible/integration/runner/all.yml +++ b/tests/ansible/integration/runner/all.yml @@ -1,6 +1,3 @@ -- import_playbook: async_job_timeout.yml -- import_playbook: async_one_job.yml -- import_playbook: async_two_simultaneous_jobs.yml - import_playbook: builtin_command_module.yml - import_playbook: custom_bash_old_style_module.yml - import_playbook: custom_bash_want_json_module.yml