Async status rewrite

-  remove need for module (at least for posix side)
  - adds retry with backoff on fetching file, since
    race is bigger since we don't spend time on module
  - now gives more info on fail
  - also made actionfail/skip handle results if given
pull/74903/head
Brian Coca 4 years ago committed by GitHub
parent e201b542be
commit 9c718ccc42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,3 @@
minor_changes:
- async_status no longer requires a module for non windows targets.
- task_executor, Actions using AnsibleActionFail/Skip will now propagate 'results' if given.

@ -586,10 +586,8 @@ class TaskExecutor:
old_sig = signal.signal(signal.SIGALRM, task_timeout)
signal.alarm(self._task.timeout)
result = self._handler.run(task_vars=variables)
except AnsibleActionSkip as e:
return dict(skipped=True, msg=to_text(e))
except AnsibleActionFail as e:
return dict(failed=True, msg=to_text(e))
except (AnsibleActionFail, AnsibleActionSkip) as e:
return e.result
except AnsibleConnectionFailure as e:
return dict(unreachable=True, msg=to_text(e))
except TaskTimeoutError as e:

@ -66,75 +66,24 @@ ansible_job_id:
sample: '360874038559.4169'
finished:
description: Whether the asynchronous job has finished (C(1)) or not (C(0))
returned: success
returned: always
type: int
sample: 1
started:
description: Whether the asynchronous job has started (C(1)) or not (C(0))
returned: success
returned: always
type: int
sample: 1
stdout:
description: Any output returned by async_wrapper
returned: always
type: str
stderr:
description: Any errors returned by async_wrapper
returned: always
type: str
erased:
description: Path to erased job file
returned: when file is erased
type: str
'''
import json
import os
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six import iteritems
from ansible.module_utils._text import to_native
def main():
module = AnsibleModule(argument_spec=dict(
jid=dict(type='str', required=True),
mode=dict(type='str', default='status', choices=['cleanup', 'status']),
# passed in from the async_status action plugin
_async_dir=dict(type='path', required=True),
))
mode = module.params['mode']
jid = module.params['jid']
async_dir = module.params['_async_dir']
# setup logging directory
logdir = os.path.expanduser(async_dir)
log_path = os.path.join(logdir, jid)
if not os.path.exists(log_path):
module.fail_json(msg="could not find job", ansible_job_id=jid, started=1, finished=1)
if mode == 'cleanup':
os.unlink(log_path)
module.exit_json(ansible_job_id=jid, erased=log_path)
# NOT in cleanup mode, assume regular status mode
# no remote kill mode currently exists, but probably should
# consider log_path + ".pid" file and also unlink that above
data = None
try:
with open(log_path) as f:
data = json.loads(f.read())
except Exception:
if not data:
# file not written yet? That means it is running
module.exit_json(results_file=log_path, ansible_job_id=jid, started=1, finished=0)
else:
module.fail_json(ansible_job_id=jid, results_file=log_path,
msg="Could not parse job output: %s" % data, started=1, finished=1)
if 'started' not in data:
data['finished'] = 1
data['ansible_job_id'] = jid
elif 'finished' not in data:
data['finished'] = 0
# Fix error: TypeError: exit_json() keywords must be strings
data = dict([(to_native(k), v) for k, v in iteritems(data)])
module.exit_json(**data)
if __name__ == '__main__':
main()

@ -440,16 +440,16 @@ class ActionBase(with_metaclass(ABCMeta, object)):
'''Determine if temporary path should be deleted or kept by user request/config'''
return tmp_path and self._cleanup_remote_tmp and not C.DEFAULT_KEEP_REMOTE_FILES and "-tmp-" in tmp_path
def _remove_tmp_path(self, tmp_path):
def _remove_tmp_path(self, tmp_path, force=False):
'''Remove a temporary path we created. '''
if tmp_path is None and self._connection._shell.tmpdir:
tmp_path = self._connection._shell.tmpdir
if self._should_remove_tmp_path(tmp_path):
if force or self._should_remove_tmp_path(tmp_path):
cmd = self._connection._shell.remove(tmp_path, recurse=True)
# If we have gotten here we have a working ssh configuration.
# If ssh breaks we could leave tmp directories out on the remote system.
# If we have gotten here we have a working connection configuration.
# If the connection breaks we could leave tmp directories out on the remote system.
tmp_rm_res = self._low_level_execute_command(cmd, sudoable=False)
if tmp_rm_res.get('rc', 0) != 0:

@ -4,7 +4,14 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.errors import AnsibleError
import json
import tempfile
import time
from ansible.constants import config
from ansible.errors import AnsibleError, AnsibleActionFail, AnsibleConnectionFailure, AnsibleFileNotFound
from ansible.module_utils._text import to_native
from ansible.module_utils.six import iteritems
from ansible.plugins.action import ActionBase
from ansible.utils.vars import merge_hash
@ -13,34 +20,103 @@ class ActionModule(ActionBase):
_VALID_ARGS = frozenset(('jid', 'mode'))
def run(self, tmp=None, task_vars=None):
results = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
def _get_async_dir(self):
if "jid" not in self._task.args:
raise AnsibleError("jid is required")
jid = self._task.args["jid"]
mode = self._task.args.get("mode", "status")
# async directory based on the shell option
async_dir = self.get_shell_option('async_dir', default="~/.ansible_async")
env_async_dir = [e for e in self._task.environment if
"ANSIBLE_ASYNC_DIR" in e]
if len(env_async_dir) > 0:
# for backwards compatibility we need to get the dir from
# ANSIBLE_ASYNC_DIR that is defined in the environment. This is
# deprecated and will be removed in favour of shell options
env_async_dir = [e for e in self._task.environment if "ANSIBLE_ASYNC_DIR" in e]
if len(env_async_dir) > 0:
async_dir = env_async_dir[0]['ANSIBLE_ASYNC_DIR']
msg = "Setting the async dir from the environment keyword " \
"ANSIBLE_ASYNC_DIR is deprecated. Set the async_dir " \
"shell option instead"
self._display.deprecated(msg, "2.12", collection_name='ansible.builtin')
return self._remote_expand_user(async_dir)
def _update_results_with_job_file(self, jid, log_path, results):
# local tempfile to copy job file to, using local tmp which is auto cleaned on exit
fd, tmpfile = tempfile.mkstemp(prefix='_async_%s' % jid, dir=config.get_config_value('DEFAULT_LOCAL_TMP'))
attempts = 0
while True:
try:
self._connection.fetch_file(log_path, tmpfile)
except AnsibleConnectionFailure:
raise
except AnsibleFileNotFound as e:
if attempts > 3:
raise AnsibleActionFail("Could not find job file on remote: %s" % to_native(e), orig_exc=e, result=results)
except AnsibleError as e:
if attempts > 3:
raise AnsibleActionFail("Could not fetch the job file from remote: %s" % to_native(e), orig_exc=e, result=results)
try:
with open(tmpfile) as f:
file_data = f.read()
except (IOError, OSError):
pass
if file_data:
break
elif attempts > 3:
raise AnsibleActionFail("Unable to fetch a usable job file", result=results)
attempts += 1
time.sleep(attempts * 0.2)
try:
data = json.loads(file_data)
except Exception:
results['finished'] = 1
results['failed'] = True
results['msg'] = "Could not parse job output: %s" % to_native(file_data, errors='surrogate_or_strict')
if 'started' not in data:
data['finished'] = 1
data['ansible_job_id'] = jid
results.update(dict([(to_native(k), v) for k, v in iteritems(data)]))
def run(self, tmp=None, task_vars=None):
results = super(ActionModule, self).run(tmp, task_vars)
# initialize response
results['started'] = results['finished'] = 0
results['stdout'] = results['stderr'] = ''
results['stdout_lines'] = results['stderr_lines'] = []
# read params
try:
jid = self._task.args["jid"]
except KeyError:
raise AnsibleActionFail("jid is required", result=results)
mode = self._task.args.get("mode", "status")
results['ansible_job_id'] = jid
async_dir = self._get_async_dir()
log_path = self._connection._shell.join_path(async_dir, jid)
if mode == 'cleanup':
self._remove_tmp_path(log_path, force=True)
results['erased'] = log_path
else:
# inject the async directory based on the shell option into the
# module args
async_dir = self.get_shell_option('async_dir', default="~/.ansible_async")
results['results_file'] = log_path
results['started'] = 1
if getattr(self._connection._shell, '_IS_WINDOWS', False):
# TODO: eventually fix so we can get remote user (%USERPROFILE%) like we get ~/ for posix
module_args = dict(jid=jid, mode=mode, _async_dir=async_dir)
status = self._execute_module(module_name='ansible.legacy.async_status', task_vars=task_vars,
module_args=module_args)
results = merge_hash(results, status)
results = merge_hash(results, self._execute_module(module_name='ansible.legacy.async_status', task_vars=task_vars, module_args=module_args))
else:
# fetch remote file and read locally
self._update_results_with_job_file(jid, log_path, results)
return results

@ -302,7 +302,7 @@
that:
- async_fandf_custom_dir is successful
- async_fandf_custom_dir_fail is failed
- async_fandf_custom_dir_fail.msg == "could not find job"
- async_fandf_custom_dir_fail.msg.startswith("Could not find job file on remote")
- async_fandf_custom_dir_result is successful
- async_fandf_custom_dir_dep_result is successful

@ -58,8 +58,6 @@ lib/ansible/modules/apt.py validate-modules:parameter-invalid
lib/ansible/modules/apt_key.py validate-modules:parameter-type-not-in-doc
lib/ansible/modules/apt_repository.py validate-modules:parameter-invalid
lib/ansible/modules/assemble.py validate-modules:nonexistent-parameter-documented
lib/ansible/modules/async_status.py use-argspec-type-path
lib/ansible/modules/async_status.py validate-modules!skip
lib/ansible/modules/async_wrapper.py ansible-doc!skip # not an actual module
lib/ansible/modules/async_wrapper.py pylint:ansible-bad-function # ignore, required
lib/ansible/modules/async_wrapper.py use-argspec-type-path

Loading…
Cancel
Save