From e02b98274b60cdbc12ef4a4c74ae0f74207384e8 Mon Sep 17 00:00:00 2001 From: James Cammarata Date: Tue, 23 Feb 2016 15:07:06 -0500 Subject: [PATCH] issue callbacks per item and retry fails - now workers passes queue to task_executor so it can send back events per item and on retry attempt - updated result class to pass along events to strategy - base strategy updated to forward new events to callback - callbacks now remove 'items' on final result but process them directly when invoked per item - new callback method to deal with retry attempt messages (also now obeys nolog) - updated tests to match new signature of task_executor fixes #14558 fixes #14072 --- lib/ansible/executor/process/result.py | 13 +++++++++ lib/ansible/executor/process/worker.py | 1 + lib/ansible/executor/task_executor.py | 28 +++++++++++-------- lib/ansible/plugins/callback/__init__.py | 25 +++++------------ lib/ansible/plugins/callback/default.py | 33 ++++++++++++++--------- lib/ansible/plugins/strategy/__init__.py | 3 ++- test/units/executor/test_task_executor.py | 16 ++++++++++- 7 files changed, 74 insertions(+), 45 deletions(-) diff --git a/lib/ansible/executor/process/result.py b/lib/ansible/executor/process/result.py index bb4c0dd0a39..7c75bbdfc21 100644 --- a/lib/ansible/executor/process/result.py +++ b/lib/ansible/executor/process/result.py @@ -104,6 +104,19 @@ class ResultProcess(multiprocessing.Process): time.sleep(0.0001) continue + # send callbacks for 'non final' results + if '_ansible_retry' in result._result: + self._send_result(('v2_playbook_retry', result)) + continue + elif '_ansible_item_result' in result._result: + if result.is_failed() or result.is_unreachable(): + self._send_result(('v2_playbook_item_on_failed', result)) + elif result.is_skipped(): + self._send_result(('v2_playbook_item_on_skipped', result)) + else: + self._send_result(('v2_playbook_item_on_ok', result)) + continue + clean_copy = strip_internal_keys(result._result) if 'invocation' in clean_copy: del clean_copy['invocation'] diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 24b9b3e5e03..7aa355aab8c 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -113,6 +113,7 @@ class WorkerProcess(multiprocessing.Process): self._new_stdin, self._loader, self._shared_loader_obj, + self._rslt_q ).run() debug("done running TaskExecutor() for %s/%s" % (self._host, self._task)) diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py index f96eb578808..162c70a2851 100644 --- a/lib/ansible/executor/task_executor.py +++ b/lib/ansible/executor/task_executor.py @@ -30,6 +30,7 @@ from ansible.compat.six import iteritems, string_types from ansible import constants as C from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable, AnsibleConnectionFailure +from ansible.executor.task_result import TaskResult from ansible.playbook.conditional import Conditional from ansible.playbook.task import Task from ansible.template import Templar @@ -60,7 +61,7 @@ class TaskExecutor: # the module SQUASH_ACTIONS = frozenset(C.DEFAULT_SQUASH_ACTIONS) - def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj): + def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, rslt_q): self._host = host self._task = task self._job_vars = job_vars @@ -69,6 +70,7 @@ class TaskExecutor: self._loader = loader self._shared_loader_obj = shared_loader_obj self._connection = None + self._rslt_q = rslt_q def run(self): ''' @@ -242,7 +244,9 @@ class TaskExecutor: # now update the result with the item info, and append the result # to the list of results res['item'] = item - #TODO: send item results to callback here, instead of all at the end + res['_ansible_item_result'] = True + + self._rslt_q.put(TaskResult(self._host, self._task, res), block=False) results.append(res) return results @@ -416,6 +420,9 @@ class TaskExecutor: return dict(unreachable=True, msg=to_unicode(e)) display.debug("handler run complete") + # preserve no log + result["_ansible_no_log"] = self._play_context.no_log + # update the local copy of vars with the registered value, if specified, # or any facts which may have been generated by the module execution if self._task.register: @@ -465,16 +472,18 @@ class TaskExecutor: _evaluate_failed_when_result(result) if attempt < retries - 1: - if retries > 1: - result['attempts'] = attempt + 1 cond = Conditional(loader=self._loader) cond.when = [ self._task.until ] if cond.evaluate_conditional(templar, vars_copy): break - - # no conditional check, or it failed, so sleep for the specified time - display.display("FAILED - RETRYING: %s (%d retries left). Result was: %s" % (self._task, retries-(attempt+1), result), color=C.COLOR_DEBUG) - time.sleep(delay) + else: + # no conditional check, or it failed, so sleep for the specified time + result['attempts'] = attempt + 1 + result['retries'] = retries + result['_ansible_retry'] = True + display.debug('Retrying task, attempt %d of %d' % (attempt + 1, retries)) + self._rslt_q.put(TaskResult(self._host, self._task, result), block=False) + time.sleep(delay) else: if retries > 1: # we ran out of attempts, so mark the result as failed @@ -506,9 +515,6 @@ class TaskExecutor: for k in ('ansible_host', ): result["_ansible_delegated_vars"][k] = delegated_vars.get(k) - # preserve no_log setting - result["_ansible_no_log"] = self._play_context.no_log - # and return display.debug("attempt loop complete, returning result") return result diff --git a/lib/ansible/plugins/callback/__init__.py b/lib/ansible/plugins/callback/__init__.py index 19a861c89b9..3a33ddbc4f1 100644 --- a/lib/ansible/plugins/callback/__init__.py +++ b/lib/ansible/plugins/callback/__init__.py @@ -172,16 +172,8 @@ class CallbackBase: return item def _process_items(self, result): - for res in result._result['results']: - newres = self._copy_result_exclude(result, ['_result']) - res['item'] = self._get_item(res) - newres._result = res - if 'failed' in res and res['failed']: - self.v2_playbook_item_on_failed(newres) - elif 'skipped' in res and res['skipped']: - self.v2_playbook_item_on_skipped(newres) - else: - self.v2_playbook_item_on_ok(newres) + # just remove them as now they get handled by individual callbacks + del result._result['results'] def _clean_results(self, result, task_name): if 'changed' in result and task_name in ['debug']: @@ -346,15 +338,6 @@ class CallbackBase: if 'diff' in result._result: self.on_file_diff(host, result._result['diff']) - def v2_playbook_on_item_ok(self, result): - pass # no v1 - - def v2_playbook_on_item_failed(self, result): - pass # no v1 - - def v2_playbook_on_item_skipped(self, result): - pass # no v1 - def v2_playbook_on_include(self, included_file): pass #no v1 correspondance @@ -366,3 +349,7 @@ class CallbackBase: def v2_playbook_item_on_skipped(self, result): pass + + def v2_playbook_retry(self, result): + pass + diff --git a/lib/ansible/plugins/callback/default.py b/lib/ansible/plugins/callback/default.py index 6ef3352a1c4..072eb5f4d25 100644 --- a/lib/ansible/plugins/callback/default.py +++ b/lib/ansible/plugins/callback/default.py @@ -51,6 +51,7 @@ class CallbackModule(CallbackBase): if result._task.loop and 'results' in result._result: self._process_items(result) + else: if delegated_vars: self._display.display("fatal: [%s -> %s]: FAILED! => %s" % (result._host.get_name(), delegated_vars['ansible_host'], self._dump_results(result._result)), color=C.COLOR_ERROR) @@ -159,24 +160,22 @@ class CallbackModule(CallbackBase): self._display.display(diff) def v2_playbook_item_on_ok(self, result): - delegated_vars = result._result.get('_ansible_delegated_vars', None) if result._task.action == 'include': return elif result._result.get('changed', False): - if delegated_vars: - msg = "changed: [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host']) - else: - msg = "changed: [%s]" % result._host.get_name() + msg = 'changed' color = C.COLOR_CHANGED else: - if delegated_vars: - msg = "ok: [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host']) - else: - msg = "ok: [%s]" % result._host.get_name() + msg = 'ok' color = C.COLOR_OK - msg += " => (item=%s)" % (result._result['item'],) + if delegated_vars: + msg += ": [%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host']) + else: + msg += ": [%s]" % result._host.get_name() + + msg += " => (item=%s)" % (self._get_item(result._result)) if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result: msg += " => %s" % self._dump_results(result._result) @@ -197,15 +196,17 @@ class CallbackModule(CallbackBase): # finally, remove the exception from the result so it's not shown every time del result._result['exception'] + msg = "failed: " if delegated_vars: - self._display.display("failed: [%s -> %s] => (item=%s) => %s" % (result._host.get_name(), delegated_vars['ansible_host'], result._result['item'], self._dump_results(result._result)), color=C.COLOR_ERROR) + msg += "[%s -> %s]" % (result._host.get_name(), delegated_vars['ansible_host']) else: - self._display.display("failed: [%s] => (item=%s) => %s" % (result._host.get_name(), result._result['item'], self._dump_results(result._result)), color=C.COLOR_ERROR) + msg += "[%s]" % (result._host.get_name()) + self._display.display(msg + " (item=%s) => %s" % (self._get_item(result._result), self._dump_results(result._result)), color=C.COLOR_ERROR) self._handle_warnings(result._result) def v2_playbook_item_on_skipped(self, result): - msg = "skipping: [%s] => (item=%s) " % (result._host.get_name(), result._result['item']) + msg = "skipping: [%s] => (item=%s) " % (result._host.get_name(), self._get_item(result._result)) if (self._display.verbosity > 0 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result: msg += " => %s" % self._dump_results(result._result) self._display.display(msg, color=C.COLOR_SKIP) @@ -254,3 +255,9 @@ class CallbackModule(CallbackBase): val = getattr(self._options,option) if val: self._display.vvvv('%s: %s' % (option,val)) + + def v2_playbook_retry(self, result): + msg = "FAILED - RETRYING: %s (%d retries left)." % (result._task, result._result['retries'] - result._result['attempts']) + if (self._display.verbosity > 2 or '_ansible_verbose_always' in result._result) and not '_ansible_verbose_override' in result._result: + msg += "Result was: %s" % self._dump_results(result._result) + self._display.display(msg, color=C.COLOR_DEBUG) diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 29d67808765..8d40aaaefeb 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -329,7 +329,8 @@ class StrategyBase: self._variable_manager.set_nonpersistent_facts(target_host, facts) else: self._variable_manager.set_host_facts(target_host, facts) - + elif result[0].startswith('v2_playbook_item') or result[0] == 'v2_playbook_retry': + self._tqm.send_callback(result[0], result[1]) else: raise AnsibleError("unknown result message received: %s" % result[0]) diff --git a/test/units/executor/test_task_executor.py b/test/units/executor/test_task_executor.py index 7135a39ae2a..b029f87114c 100644 --- a/test/units/executor/test_task_executor.py +++ b/test/units/executor/test_task_executor.py @@ -45,6 +45,7 @@ class TestTaskExecutor(unittest.TestCase): mock_shared_loader = MagicMock() new_stdin = None job_vars = dict() + mock_queue = MagicMock() te = TaskExecutor( host = mock_host, task = mock_task, @@ -53,6 +54,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = new_stdin, loader = fake_loader, shared_loader_obj = mock_shared_loader, + rslt_q = mock_queue, ) def test_task_executor_run(self): @@ -66,6 +68,7 @@ class TestTaskExecutor(unittest.TestCase): mock_play_context = MagicMock() mock_shared_loader = MagicMock() + mock_queue = MagicMock() new_stdin = None job_vars = dict() @@ -78,6 +81,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = new_stdin, loader = fake_loader, shared_loader_obj = mock_shared_loader, + rslt_q = mock_queue, ) te._get_loop_items = MagicMock(return_value=None) @@ -97,7 +101,7 @@ class TestTaskExecutor(unittest.TestCase): def test_task_executor_get_loop_items(self): fake_loader = DictDataLoader({}) - + mock_host = MagicMock() mock_task = MagicMock() @@ -111,6 +115,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = None job_vars = dict() + mock_queue = MagicMock() te = TaskExecutor( host = mock_host, @@ -120,6 +125,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = new_stdin, loader = fake_loader, shared_loader_obj = mock_shared_loader, + rslt_q = mock_queue, ) items = te._get_loop_items() @@ -142,6 +148,7 @@ class TestTaskExecutor(unittest.TestCase): mock_play_context = MagicMock() mock_shared_loader = MagicMock() + mock_queue = MagicMock() new_stdin = None job_vars = dict() @@ -154,6 +161,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = new_stdin, loader = fake_loader, shared_loader_obj = mock_shared_loader, + rslt_q = mock_queue, ) def _execute(variables): @@ -184,6 +192,7 @@ class TestTaskExecutor(unittest.TestCase): mock_play_context = MagicMock() mock_shared_loader = None + mock_queue = MagicMock() new_stdin = None job_vars = dict(pkg_mgr='yum') @@ -196,6 +205,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = new_stdin, loader = fake_loader, shared_loader_obj = mock_shared_loader, + rslt_q = mock_queue, ) # @@ -279,6 +289,7 @@ class TestTaskExecutor(unittest.TestCase): mock_connection._connect.return_value = None mock_action = MagicMock() + mock_queue = MagicMock() shared_loader = None new_stdin = None @@ -292,6 +303,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = new_stdin, loader = fake_loader, shared_loader_obj = shared_loader, + rslt_q = mock_queue, ) te._get_connection = MagicMock(return_value=mock_connection) @@ -330,6 +342,7 @@ class TestTaskExecutor(unittest.TestCase): mock_connection = MagicMock() mock_action = MagicMock() + mock_queue = MagicMock() shared_loader = MagicMock() shared_loader.action_loader = action_loader @@ -345,6 +358,7 @@ class TestTaskExecutor(unittest.TestCase): new_stdin = new_stdin, loader = fake_loader, shared_loader_obj = shared_loader, + rslt_q = mock_queue, ) te._connection = MagicMock()