ansible: implment async job time limit.

pull/262/head
David Wilson 8 years ago
parent d2accbce53
commit 3994f1b30a

@ -288,6 +288,15 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
# ~root/.ansible -> /root/.ansible # ~root/.ansible -> /root/.ansible
return self.call(os.path.expanduser, mitogen.utils.cast(path)) return self.call(os.path.expanduser, mitogen.utils.cast(path))
def get_task_timeout_secs(self):
"""
Return the task "async:" value, portable across 2.4-2.5.
"""
try:
return self._task.async_val
except AttributeError:
return getattr(self._task, 'async')
def _execute_module(self, module_name=None, module_args=None, tmp=None, def _execute_module(self, module_name=None, module_args=None, tmp=None,
task_vars=None, persist_files=False, task_vars=None, persist_files=False,
delete_remote_tmp=True, wrap_async=False): delete_remote_tmp=True, wrap_async=False):
@ -318,6 +327,7 @@ class ActionModuleMixin(ansible.plugins.action.ActionBase):
templar=self._templar, templar=self._templar,
env=mitogen.utils.cast(env), env=mitogen.utils.cast(env),
wrap_async=wrap_async, wrap_async=wrap_async,
timeout_secs=self.get_task_timeout_secs(),
) )
) )

@ -94,7 +94,7 @@ class Invocation(object):
target.run_module() or helpers.run_module_async() in the target context. target.run_module() or helpers.run_module_async() in the target context.
""" """
def __init__(self, action, connection, module_name, module_args, def __init__(self, action, connection, module_name, module_args,
task_vars, templar, env, wrap_async): task_vars, templar, env, wrap_async, timeout_secs):
#: ActionBase instance invoking the module. Required to access some #: ActionBase instance invoking the module. Required to access some
#: output postprocessing methods that don't belong in ActionBase at #: output postprocessing methods that don't belong in ActionBase at
#: all. #: all.
@ -114,7 +114,8 @@ class Invocation(object):
self.env = env self.env = env
#: Boolean, if :py:data:`True`, launch the module asynchronously. #: Boolean, if :py:data:`True`, launch the module asynchronously.
self.wrap_async = wrap_async self.wrap_async = wrap_async
#: Integer, if >0, limit the time an asynchronous job may run for.
self.timeout_secs = timeout_secs
#: Initially ``None``, but set by :func:`invoke`. The path on the #: Initially ``None``, but set by :func:`invoke`. The path on the
#: master to the module's implementation file. #: master to the module's implementation file.
self.module_path = None self.module_path = None
@ -403,6 +404,7 @@ def _invoke_async_task(invocation, planner):
context.call_no_reply( context.call_no_reply(
ansible_mitogen.target.run_module_async, ansible_mitogen.target.run_module_async,
job_id=job_id, job_id=job_id,
timeout_secs=invocation.timeout_secs,
kwargs=planner.get_kwargs(), kwargs=planner.get_kwargs(),
) )

@ -40,6 +40,7 @@ import operator
import os import os
import pwd import pwd
import re import re
import signal
import stat import stat
import subprocess import subprocess
import tempfile import tempfile
@ -283,7 +284,27 @@ def _write_job_status(job_id, dct):
os.rename(path + '.tmp', path) os.rename(path + '.tmp', path)
def _run_module_async(kwargs, job_id, econtext): def _sigalrm(broker, timeout_secs, job_id):
"""
Respond to SIGALRM (job timeout) by updating the job file and killing the
process.
"""
msg = "Job reached maximum time limit of %d seconds." % (timeout_secs,)
_write_job_status(job_id, {
"failed": 1,
"finished": 1,
"msg": msg,
})
broker.shutdown()
def _install_alarm(broker, timeout_secs, job_id):
handler = lambda *_: _sigalrm(broker, timeout_secs, job_id)
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_secs)
def _run_module_async(kwargs, job_id, timeout_secs, econtext):
""" """
1. Immediately updates the status file to mark the job as started. 1. Immediately updates the status file to mark the job as started.
2. Installs a timer/signal handler to implement the time limit. 2. Installs a timer/signal handler to implement the time limit.
@ -293,6 +314,8 @@ def _run_module_async(kwargs, job_id, econtext):
Runner keyword arguments. Runner keyword arguments.
:param str job_id: :param str job_id:
String job ID. String job ID.
:param int timeout_secs:
If >0, limit the task's maximum run time.
""" """
_write_job_status(job_id, { _write_job_status(job_id, {
'started': 1, 'started': 1,
@ -300,6 +323,9 @@ def _run_module_async(kwargs, job_id, econtext):
'pid': os.getpid() 'pid': os.getpid()
}) })
if timeout_secs > 0:
_install_alarm(econtext.broker, timeout_secs, job_id)
kwargs['detach'] = True kwargs['detach'] = True
kwargs['econtext'] = econtext kwargs['econtext'] = econtext
kwargs['emulate_tty'] = False kwargs['emulate_tty'] = False
@ -327,7 +353,7 @@ def _run_module_async(kwargs, job_id, econtext):
@mitogen.core.takes_econtext @mitogen.core.takes_econtext
def run_module_async(kwargs, job_id, econtext): def run_module_async(kwargs, job_id, timeout_secs, econtext):
""" """
Arrange for a module to be executed with its run status and result Arrange for a module to be executed with its run status and result
serialized to a disk file. This function expects to run in a child forked serialized to a disk file. This function expects to run in a child forked
@ -335,7 +361,7 @@ def run_module_async(kwargs, job_id, econtext):
""" """
try: try:
try: try:
_run_module_async(kwargs, job_id, econtext) _run_module_async(kwargs, job_id, timeout_secs, econtext)
except Exception: except Exception:
# Catch any (ansible_mitogen) bugs and write them to the job file. # Catch any (ansible_mitogen) bugs and write them to the job file.
_write_job_status(job_id, { _write_job_status(job_id, {

@ -140,8 +140,6 @@ Noteworthy Differences
artificial serialization, causing slowdown equivalent to `task_duration * artificial serialization, causing slowdown equivalent to `task_duration *
num_targets`. This will be fixed soon. num_targets`. This will be fixed soon.
* Asynchronous job time limits are not implemented.
* "Module Replacer" style modules are not supported. These rarely appear in * "Module Replacer" style modules are not supported. These rarely appear in
practice, and light web searches failed to reveal many examples of them. practice, and light web searches failed to reveal many examples of them.

@ -3,4 +3,6 @@
- import_playbook: result_shell_echo_hi.yml - import_playbook: result_shell_echo_hi.yml
- import_playbook: runner_new_process.yml - import_playbook: runner_new_process.yml
- import_playbook: runner_one_job.yml - import_playbook: runner_one_job.yml
- import_playbook: runner_timeout_then_polling.yml
- import_playbook: runner_with_polling_and_timeout.yml
- import_playbook: runner_two_simultaneous_jobs.yml - import_playbook: runner_two_simultaneous_jobs.yml

@ -1,52 +0,0 @@
# Verify 'async: <timeout>' functions as desired.
- name: integration/async/runner_job_timeout.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify async-with-polling-and-timeout behaviour.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 1
register: job1
- assert:
that:
- job1.changed == False
- job1.failed == True
- job1.msg == "async task did not complete within the requested time"
- job1.keys()|sort == ['changed', 'failed', 'msg']
# Verify async-with-timeout-then-poll behaviour.
# This is broken in upstream Ansible, so disable the tests there.
#
# TODO: the tests below are totally broken, not clear what Ansible is
# supposed to do here, so can't emulate it in Mitogen.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 0
register: job2
when: false # is_mitogen
- name: poll up to 10 times.
async_status:
jid: "{{job2.ansible_job_id}}"
register: result2
until: result2.finished
retries: 10
delay: 1
when: false # is_mitogen
- assert:
that:
- result1.rc == 0
- result2.rc == 0
- result2.stdout == 'im_alive'
when: false # is_mitogen

@ -0,0 +1,34 @@
# Verify 'async: <timeout>' functions as desired.
- name: integration/async/runner_timeout_then_polling.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify async-with-timeout-then-poll behaviour.
# This is semi-broken in upstream Ansible, it does not bother to update the
# job file on failure. So only test on Mitogen.
- name: sleep for 7 seconds, but timeout after 1 second.
shell: sleep 10
async: 1
poll: 0
register: job
when: is_mitogen
- name: busy-poll up to 500 times
async_status:
jid: "{{job.ansible_job_id}}"
register: result
until: result.finished
retries: 500
delay: 0
when: is_mitogen
ignore_errors: true
- assert:
that:
- result.failed == 1
- result.finished == 1
- result.msg == "Job reached maximum time limit of 1 seconds."
when: is_mitogen

@ -0,0 +1,24 @@
# Verify 'async: <timeout>' functions as desired.
- name: integration/async/runner_with_polling_and_timeout.yml
hosts: test-targets
any_errors_fatal: true
tasks:
# Verify async-with-polling-and-timeout behaviour.
- name: sleep for 7 seconds, but timeout after 1 second.
ignore_errors: true
shell: sleep 7
async: 1
poll: 1
register: job1
- assert:
that:
- job1.changed == False
- job1.failed == True
- |
job1.msg == "async task did not complete within the requested time" or
job1.msg == "Job reached maximum time limit of 1 seconds."
Loading…
Cancel
Save