ansible: remove JobResultService, more compatible async jobs; closes #191.

And by "compatible" I mean "terrible". This does not implement async job
timeouts, but I'm not going to bother, upstream async implementation is
so buggy and inconsistent it resists even having its behaviour captured
in tests.
pull/197/head
David Wilson 7 years ago
parent 6ad18b6719
commit 85e1f5f515

@ -38,7 +38,6 @@ from __future__ import absolute_import
import json import json
import logging import logging
import os import os
import random
from ansible.executor import module_common from ansible.executor import module_common
import ansible.errors import ansible.errors
@ -116,8 +115,6 @@ class Invocation(object):
self.env = env self.env = env
#: Boolean, if :py:data:`True`, launch the module asynchronously. #: Boolean, if :py:data:`True`, launch the module asynchronously.
self.wrap_async = wrap_async self.wrap_async = wrap_async
#: String Job ID.
self.job_id = self._make_job_id()
#: Initially ``None``, but set by :func:`invoke`. The path on the #: Initially ``None``, but set by :func:`invoke`. The path on the
#: master to the module's implementation file. #: master to the module's implementation file.
@ -126,9 +123,6 @@ class Invocation(object):
#: binary contents of the module. #: binary contents of the module.
self.module_source = None self.module_source = None
def _make_job_id(self):
return '%016x' % random.randint(0, 2**64)
def __repr__(self): def __repr__(self):
return 'Invocation(module_name=%s)' % (self.module_name,) return 'Invocation(module_name=%s)' % (self.module_name,)
@ -167,9 +161,10 @@ class Planner(object):
# named by `runner_name`. # named by `runner_name`.
} }
""" """
kwargs.setdefault('job_id', invocation.job_id) kwargs.setdefault('emulate_tty', True)
kwargs.setdefault('service_context', invocation.connection.parent) kwargs.setdefault('service_context', invocation.connection.parent)
kwargs.setdefault('should_fork', self.get_should_fork(invocation)) kwargs.setdefault('should_fork', self.get_should_fork(invocation))
kwargs.setdefault('wrap_async', invocation.wrap_async)
return kwargs return kwargs
@ -349,7 +344,10 @@ def get_module_data(name):
return path, source return path, source
def _do_invoke(invocation): def invoke(invocation):
"""
Find a suitable Planner that knows how to run `invocation`.
"""
(invocation.module_path, (invocation.module_path,
invocation.module_source) = get_module_data(invocation.module_name) invocation.module_source) = get_module_data(invocation.module_name)
@ -358,52 +356,12 @@ def _do_invoke(invocation):
if planner.detect(invocation): if planner.detect(invocation):
LOG.debug('%r accepted %r (filename %r)', planner, LOG.debug('%r accepted %r (filename %r)', planner,
invocation.module_name, invocation.module_path) invocation.module_name, invocation.module_path)
break return invocation.action._postprocess_response(
invocation.connection.call(
ansible_mitogen.target.run_module,
planner.plan(invocation),
)
)
LOG.debug('%r rejected %r', planner, invocation.module_name) LOG.debug('%r rejected %r', planner, invocation.module_name)
else:
raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))
return invocation.connection.call_async(
ansible_mitogen.target.run_module,
planner.plan(invocation),
)
def _invoke_async(invocation):
_do_invoke(invocation)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': invocation.job_id,
})
}
def _invoke_sync(invocation):
result_recv = mitogen.core.Receiver(invocation.connection.router)
mitogen.service.call_async(
context=invocation.connection.parent,
handle=ansible_mitogen.services.JobResultService.handle,
method='listen',
kwargs={
'job_id': invocation.job_id,
'sender': result_recv.to_sender(),
}
)
_do_invoke(invocation)
return result_recv.get().unpickle()
def invoke(invocation):
"""
Find a suitable Planner that knows how to run `invocation`.
"""
if invocation.wrap_async:
js = _invoke_async(invocation)
else:
js = _invoke_sync(invocation)
return invocation.action._postprocess_response(js) raise ansible.errors.AnsibleError(NO_METHOD_MSG + repr(invocation))

@ -1,78 +0,0 @@
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import ansible.plugins.action
import mitogen.core
import mitogen.utils
import ansible_mitogen.services
import ansible_mitogen.target
class ActionModule(ansible.plugins.action.ActionBase):
def _get_async_result(self, job_id):
self._connection._connect()
return mitogen.service.call(
context=self._connection.parent,
handle=ansible_mitogen.services.JobResultService.handle,
method='get',
kwargs={
'job_id': job_id,
}
)
def _on_result_pending(self, job_id):
return {
'_ansible_parsed': True,
'ansible_job_id': job_id,
'started': 1,
'failed': 0,
'finished': 0,
'msg': '',
}
def _on_result_available(self, job_id, result):
dct = self._parse_returned_data(result)
dct['ansible_job_id'] = job_id
dct['started'] = 1
dct['finished'] = 1
# Cutpasted from the action.py.
if 'stdout' in dct and 'stdout_lines' not in dct:
dct['stdout_lines'] = (dct['stdout'] or u'').splitlines()
if 'stderr' in dct and 'stderr_lines' not in dct:
dct['stderr_lines'] = (dct['stderr'] or u'').splitlines()
return dct
def run(self, tmp=None, task_vars=None):
job_id = mitogen.utils.cast(self._task.args['jid'])
result = self._get_async_result(job_id)
if result is None:
return self._on_result_pending(job_id)
else:
return self._on_result_available(job_id, result)

@ -155,7 +155,6 @@ class MuxProcess(object):
services=[ services=[
ansible_mitogen.services.ContextService(self.router), ansible_mitogen.services.ContextService(self.router),
ansible_mitogen.services.FileService(self.router), ansible_mitogen.services.FileService(self.router),
ansible_mitogen.services.JobResultService(self.router),
], ],
size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')), size=int(os.environ.get('MITOGEN_POOL_SIZE', '16')),
) )

@ -83,17 +83,17 @@ class Runner(object):
Subclasses may override `_run`()` and extend `setup()` and `revert()`. Subclasses may override `_run`()` and extend `setup()` and `revert()`.
""" """
def __init__(self, module, job_id, remote_tmp, service_context, def __init__(self, module, remote_tmp, service_context,
raw_params=None, args=None, env=None): emulate_tty=None, raw_params=None, args=None, env=None):
if args is None: if args is None:
args = {} args = {}
if raw_params is not None: if raw_params is not None:
args['_raw_params'] = raw_params args['_raw_params'] = raw_params
self.module = module self.module = module
self.job_id = job_id
self.remote_tmp = os.path.expanduser(remote_tmp) self.remote_tmp = os.path.expanduser(remote_tmp)
self.service_context = service_context self.service_context = service_context
self.emulate_tty = emulate_tty
self.raw_params = raw_params self.raw_params = raw_params
self.args = args self.args = args
self.env = env self.env = env
@ -135,17 +135,6 @@ class Runner(object):
""" """
raise NotImplementedError() raise NotImplementedError()
def _send_result(self, dct):
mitogen.service.call(
context=self.service_context,
handle=502,
method='push',
kwargs={
'job_id': self.job_id,
'result': dct
}
)
def run(self): def run(self):
""" """
Set up the process environment in preparation for running an Ansible Set up the process environment in preparation for running an Ansible
@ -158,10 +147,7 @@ class Runner(object):
""" """
self.setup() self.setup()
try: try:
try: return self._run()
self._send_result(self._run())
except Exception as e:
self._send_result(mitogen.core.CallError(e))
finally: finally:
self.revert() self.revert()
@ -260,7 +246,7 @@ class ProgramRunner(Runner):
try: try:
rc, stdout, stderr = ansible_mitogen.target.exec_args( rc, stdout, stderr = ansible_mitogen.target.exec_args(
args=self._get_program_args(), args=self._get_program_args(),
emulate_tty=True, emulate_tty=self.emulate_tty,
) )
except Exception, e: except Exception, e:
LOG.exception('While running %s', self._get_program_args()) LOG.exception('While running %s', self._get_program_args())

@ -246,8 +246,8 @@ class ContextService(mitogen.service.Service):
# return value somewhere, but logs will catch a failure anyway. # return value somewhere, but logs will catch a failure anyway.
context.call_async(ansible_mitogen.target.start_fork_parent) context.call_async(ansible_mitogen.target.start_fork_parent)
if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'): if os.environ.get('MITOGEN_DUMP_THREAD_STACKS'):
import mitogen.debug from mitogen import debug
context.call(mitogen.debug.dump_to_logger) context.call(debug.dump_to_logger)
self._key_by_context[context] = key self._key_by_context[context] = key
self._refs_by_context[context] = 0 self._refs_by_context[context] = 0
return { return {
@ -369,93 +369,3 @@ class FileService(mitogen.service.Service):
LOG.debug('Serving %r', path) LOG.debug('Serving %r', path)
return self._paths[path] return self._paths[path]
class JobResultService(mitogen.service.Service):
"""
Receive the result of a task from a child and forward it to interested
listeners. If no listener exists, store the result until it is requested.
Storing results in an intermediary service allows:
* the lifetime of the worker to be decoupled from the lifetime of the job,
* for new and unrelated workers to request the job result after the original
worker that spawned it has exitted,
* for synchronous and asynchronous jobs to be treated identically,
* for latency-free polling and waiting on job results, and
* for Ansible job IDs to be be used to refer to a job in preference to
Mitogen-internal identifiers such as Sender and Context.
Results are keyed by job ID.
"""
handle = 502
max_message_size = 1048576 * 64
def __init__(self, router):
super(JobResultService, self).__init__(router)
self._lock = threading.Lock()
self._result_by_job_id = {}
self._sender_by_job_id = {}
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'job_id': str,
'sender': mitogen.core.Sender,
})
def listen(self, job_id, sender):
"""
Register to receive the result of a job when it becomes available.
:param str job_id:
Job ID to listen for.
:param mitogen.core.Sender sender:
Sender on which to deliver the job result.
"""
LOG.debug('%r.listen(job_id=%r, sender=%r)', self, job_id, sender)
with self._lock:
if job_id in self._sender_by_job_id:
raise Error('Listener already exists for job: %s' % (job_id,))
self._sender_by_job_id[job_id] = sender
@mitogen.service.expose(mitogen.service.AllowParents())
@mitogen.service.arg_spec({
'job_id': basestring,
})
def get(self, job_id):
"""
Return a job's result if it is available, otherwise return immediately.
The job result is forgotten once it has been returned by this method.
:param str job_id:
Job ID to return.
:returns:
Job result dictionary, or :data:`None`.
"""
LOG.debug('%r.get(job_id=%r)', self, job_id)
with self._lock:
return self._result_by_job_id.pop(job_id, None)
@mitogen.service.expose(mitogen.service.AllowAny())
@mitogen.service.arg_spec({
'job_id': basestring,
'result': (mitogen.core.CallError, dict)
})
def push(self, job_id, result):
"""
Deliver a job's result from a child context, notifying any listener
registred via :meth:`listen` of the result.
:param str job_id:
Job ID whose result is being pushed.
:param dict result:
Job result dictionary.
"""
LOG.debug('%r.push(job_id=%r, result=%r)', self, job_id, result)
with self._lock:
if job_id in self._result_by_job_id:
raise Error('Result already exists for job: %s' % (job_id,))
sender = self._sender_by_job_id.pop(job_id, None)
if sender:
sender.send(result)
else:
self._result_by_job_id[job_id] = result

@ -48,19 +48,9 @@ def wrap_action_loader__get(name, *args, **kwargs):
helper methods inherited from ActionBase with implementations that avoid helper methods inherited from ActionBase with implementations that avoid
the use of shell fragments wherever possible. the use of shell fragments wherever possible.
Additionally catch attempts to instantiate the "normal" action with a task
argument whose action is "async_status", and redirect it to a special
implementation that fetches the task result via RPC.
This is used instead of static subclassing as it generalizes to third party This is used instead of static subclassing as it generalizes to third party
action modules outside the Ansible tree. action modules outside the Ansible tree.
""" """
# Necessary since async_status execution strategy is hard-wired in
# executor/task_executor.py::_poll_async_result().
if ( name == 'normal' and 'task' in kwargs and
kwargs['task'].action == 'async_status'):
name = 'mitogen_async_status'
klass = action_loader__get(name, class_only=True) klass = action_loader__get(name, class_only=True)
if klass: if klass:
wrapped_name = 'MitogenActionModule_' + name wrapped_name = 'MitogenActionModule_' + name
@ -172,7 +162,6 @@ class StrategyMixin(object):
""" """
base_dir = os.path.join(os.path.dirname(__file__), 'plugins') base_dir = os.path.join(os.path.dirname(__file__), 'plugins')
connection_loader.add_directory(os.path.join(base_dir, 'connection')) connection_loader.add_directory(os.path.join(base_dir, 'connection'))
action_loader.add_directory(os.path.join(base_dir, 'actions'))
def run(self, iterator, play_context, result=0): def run(self, iterator, play_context, result=0):
""" """

@ -42,9 +42,10 @@ import re
import stat import stat
import subprocess import subprocess
import tempfile import tempfile
import threading import traceback
import zlib import zlib
import ansible.module_utils.json_utils
import ansible_mitogen.runner import ansible_mitogen.runner
import ansible_mitogen.services import ansible_mitogen.services
import mitogen.core import mitogen.core
@ -112,11 +113,26 @@ def start_fork_parent(econtext):
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
def start_fork_child(kwargs, econtext): def start_fork_child(wrap_async, kwargs, econtext):
mitogen.parent.upgrade_router(econtext) mitogen.parent.upgrade_router(econtext)
context = econtext.router.fork() context = econtext.router.fork()
kwargs['shutdown_on_exit'] = True if not wrap_async:
context.call_async(run_module, kwargs) try:
return context.call(run_module, kwargs)
finally:
context.shutdown()
job_id = '%016x' % random.randint(0, 2**64)
context.call_async(run_module_async, job_id, kwargs)
return {
'stdout': json.dumps({
# modules/utilities/logic/async_wrapper.py::_run_module().
'changed': True,
'started': 1,
'finished': 0,
'ansible_job_id': job_id,
})
}
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
@ -128,16 +144,91 @@ def run_module(kwargs, econtext):
from reading sys.stdin. from reading sys.stdin.
""" """
should_fork = kwargs.pop('should_fork', False) should_fork = kwargs.pop('should_fork', False)
shutdown_on_exit = kwargs.pop('shutdown_on_exit', False) wrap_async = kwargs.pop('wrap_async', False)
if should_fork: if should_fork:
_fork_parent.call(start_fork_child, kwargs) return _fork_parent.call(start_fork_child, wrap_async, kwargs)
return
runner_name = kwargs.pop('runner_name') runner_name = kwargs.pop('runner_name')
klass = getattr(ansible_mitogen.runner, runner_name) klass = getattr(ansible_mitogen.runner, runner_name)
impl = klass(**kwargs) impl = klass(**kwargs)
impl.run() return impl.run()
if shutdown_on_exit:
def _get_async_dir():
return os.path.expanduser(
os.environ.get('ANSIBLE_ASYNC_DIR', '~/.ansible_async')
)
def _write_job_status(job_id, dct):
"""
Update an async job status file.
"""
LOG.info('_write_job_status(%r, %r)', job_id, dct)
dct.setdefault('ansible_job_id', job_id)
dct.setdefault('data', '')
async_dir = _get_async_dir()
if not os.path.exists(async_dir):
os.makedirs(async_dir)
path = os.path.join(async_dir, job_id)
with open(path + '.tmp', 'w') as fp:
fp.write(json.dumps(dct))
os.rename(path + '.tmp', path)
def _run_module_async(job_id, kwargs, econtext):
"""
Body on run_module_async().
1. Immediately updates the status file to mark the job as started.
2. Installs a timer/signal handler to implement the time limit.
3. Runs as with run_module(), writing the result to the status file.
"""
_write_job_status(job_id, {
'started': 1,
'finished': 0
})
kwargs['emulate_tty'] = False
dct = run_module(kwargs, econtext)
if mitogen.core.PY3:
for key in 'stdout', 'stderr':
dct[key] = dct[key].decode('utf-8', 'surrogateescape')
try:
filtered, warnings = (
ansible.module_utils.json_utils.
_filter_non_json_lines(dct['stdout'])
)
result = json.loads(filtered)
result.setdefault('warnings', []).extend(warnings)
result['stderr'] = dct['stderr']
_write_job_status(job_id, result)
except Exception:
_write_job_status(job_id, {
"failed": 1,
"msg": traceback.format_exc(),
"data": dct['stdout'], # temporary notice only
"stderr": dct['stderr']
})
@mitogen.core.takes_econtext
def run_module_async(job_id, kwargs, econtext):
"""
Since run_module_async() is invoked with .call_async(), with nothing to
read the result from the corresponding Receiver, wrap the body in an
exception logger, and wrap that in something that tears down the context on
completion.
"""
try:
try:
_run_module_async(job_id, kwargs, econtext)
except Exception:
LOG.exception('_run_module_async crashed')
finally:
econtext.broker.shutdown() econtext.broker.shutdown()

@ -1,3 +1,5 @@
- import_playbook: runner_job_timeout.yml - import_playbook: result_binary_producing_json.yml
- import_playbook: result_binary_producing_junk.yml
- import_playbook: result_shell_echo_hi.yml
- import_playbook: runner_one_job.yml - import_playbook: runner_one_job.yml
- import_playbook: runner_two_simultaneous_jobs.yml - import_playbook: runner_two_simultaneous_jobs.yml

@ -0,0 +1,30 @@
- name: integration/async/result_binary_producing_json.yml
gather_facts: true
hosts: all
any_errors_fatal: true
tasks:
- custom_binary_producing_json:
async: 100
poll: 0
register: job
- shell: sleep 1
- slurp:
src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}"
register: result
- debug: msg={{async_out}}
vars:
async_out: "{{result.content|b64decode|from_json}}"
- assert:
that:
- async_out.changed == True
- async_out.failed == False
- async_out.msg == "Hello, world."
- 'async_out.stderr == "binary_producing_json: oh noes\n"'
vars:
async_out: "{{result.content|b64decode|from_json}}"

@ -0,0 +1,32 @@
- name: integration/async/result_binary_producing_junk.yml
gather_facts: true
hosts: all
any_errors_fatal: true
tasks:
- custom_binary_producing_junk:
async: 100
poll: 0
register: job
- shell: sleep 1
- slurp:
src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}"
register: result
- debug: msg={{async_out}}
vars:
async_out: "{{result.content|b64decode|from_json}}"
- assert:
that:
- async_out.ansible_job_id == job.ansible_job_id
- async_out.data == "Hello, world.\n"
- async_out.failed == 1
- async_out.msg.startswith("Traceback")
- '"ValueError: No start of json char found\n" in async_out.msg'
- 'async_out.stderr == "binary_producing_junk: oh noes\n"'
vars:
async_out: "{{result.content|b64decode|from_json}}"

@ -0,0 +1,42 @@
- name: integration/async/result_shell_echo_hi.yml
gather_facts: true
hosts: all
any_errors_fatal: true
tasks:
- shell: echo hi
async: 100
poll: 0
register: job
- shell: sleep 1
- slurp:
src: "{{ansible_user_dir}}/.ansible_async/{{job.ansible_job_id}}"
register: result
- debug: msg={{async_out}}
vars:
async_out: "{{result.content|b64decode|from_json}}"
- assert:
that:
- async_out.changed == True
- async_out.cmd == "echo hi"
- 'async_out.delta.startswith("0:00:00")'
- async_out.end.startswith("20")
- async_out.invocation.module_args._raw_params == "echo hi"
- async_out.invocation.module_args._uses_shell == True
- async_out.invocation.module_args.chdir == None
- async_out.invocation.module_args.creates == None
- async_out.invocation.module_args.executable == None
- async_out.invocation.module_args.removes == None
- async_out.invocation.module_args.stdin == None
- async_out.invocation.module_args.warn == True
- async_out.rc == 0
- async_out.start.startswith("20")
- async_out.stderr == ""
- async_out.stdout == "hi"
vars:
async_out: "{{result.content|b64decode|from_json}}"

@ -1,6 +1,3 @@
- import_playbook: async_job_timeout.yml
- import_playbook: async_one_job.yml
- import_playbook: async_two_simultaneous_jobs.yml
- import_playbook: builtin_command_module.yml - import_playbook: builtin_command_module.yml
- import_playbook: custom_bash_old_style_module.yml - import_playbook: custom_bash_old_style_module.yml
- import_playbook: custom_bash_want_json_module.yml - import_playbook: custom_bash_want_json_module.yml

Loading…
Cancel
Save