Send callbacks directly from the TaskExecutor instead of TaskResults masquerading as callbacks (#73927)

pull/73954/head
Matt Martz 4 years ago committed by GitHub
parent 561cdf3ace
commit 78f34786dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,5 @@
minor_changes:
- Callbacks - Migrate more places in the ``TaskExecutor`` to sending callbacks directly
over the queue, instead of sending them as ``TaskResult`` and short circuiting in the
Strategy to send the callback. This enables closer to real time callbacks of retries
and loop results (https://github.com/ansible/ansible/issues/73899)

@ -382,12 +382,21 @@ class TaskExecutor:
'msg': 'Failed to template loop_control.label: %s' % to_text(e)
})
self._final_q.send_task_result(
tr = TaskResult(
self._host.name,
self._task._uuid,
res,
task_fields=task_fields,
)
if tr.is_failed() or tr.is_unreachable():
self._final_q.send_callback('v2_runner_item_on_failed', tr)
elif tr.is_skipped():
self._final_q.send_callback('v2_runner_item_on_skipped', tr)
else:
if getattr(self._task, 'diff', False):
self._final_q.send_callback('v2_on_file_diff', tr)
self._final_q.send_callback('v2_runner_item_on_ok', tr)
results.append(res)
del task_vars[loop_var]
@ -673,7 +682,15 @@ class TaskExecutor:
result['_ansible_retry'] = True
result['retries'] = retries
display.debug('Retrying task, attempt %d of %d' % (attempt, retries))
self._final_q.send_task_result(self._host.name, self._task._uuid, result, task_fields=self._task.dump_attrs())
self._final_q.send_callback(
'v2_runner_retry',
TaskResult(
self._host.name,
self._task._uuid,
result,
task_fields=self._task.dump_attrs()
)
)
time.sleep(delay)
self._handler = self._get_action_handler(connection=self._connection, templar=templar)
else:
@ -782,8 +799,8 @@ class TaskExecutor:
self._final_q.send_callback(
'v2_runner_on_async_poll',
TaskResult(
self._host,
async_task,
self._host.name,
async_task, # We send the full task here, because the controller knows nothing about it, the TE created it
async_result,
task_fields=self._task.dump_attrs(),
),

@ -94,8 +94,13 @@ def results_thread_main(strategy):
if isinstance(result, StrategySentinel):
break
elif isinstance(result, CallbackSend):
for arg in result.args:
if isinstance(arg, TaskResult):
strategy.normalize_task_result(arg)
break
strategy._tqm.send_callback(result.method_name, *result.args, **result.kwargs)
elif isinstance(result, TaskResult):
strategy.normalize_task_result(result)
with strategy._results_lock:
# only handlers have the listen attr, so this must be a handler
# we split up the results into two queues here to make sure
@ -446,6 +451,31 @@ class StrategyBase:
for target_host in host_list:
_set_host_facts(target_host, always_facts)
def normalize_task_result(self, task_result):
"""Normalize a TaskResult to reference actual Host and Task objects
when only given the ``Host.name``, or the ``Task._uuid``
Only the ``Host.name`` and ``Task._uuid`` are commonly sent back from
the ``TaskExecutor`` or ``WorkerProcess`` due to performance concerns
Mutates the original object
"""
if isinstance(task_result._host, string_types):
# If the value is a string, it is ``Host.name``
task_result._host = self._inventory.get_host(to_text(task_result._host))
if isinstance(task_result._task, string_types):
# If the value is a string, it is ``Task._uuid``
queue_cache_entry = (task_result._host.name, task_result._task)
found_task = self._queued_task_cache.get(queue_cache_entry)['task']
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
original_task._parent = found_task._parent
original_task.from_attrs(task_result._task_fields)
task_result._task = original_task
return task_result
@debug_closure
def _process_pending_results(self, iterator, one_pass=False, max_passes=None, do_handlers=False):
'''
@ -456,14 +486,6 @@ class StrategyBase:
ret_results = []
handler_templar = Templar(self._loader)
def get_original_host(host_name):
# FIXME: this should not need x2 _inventory
host_name = to_text(host_name)
if host_name in self._inventory.hosts:
return self._inventory.hosts[host_name]
else:
return self._inventory.get_host(host_name)
def search_handler_blocks_by_name(handler_name, handler_blocks):
# iterate in reversed order since last handler loaded with the same name wins
for handler_block in reversed(handler_blocks):
@ -512,32 +534,8 @@ class StrategyBase:
finally:
self._results_lock.release()
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
original_host = get_original_host(task_result._host)
queue_cache_entry = (original_host.name, task_result._task)
found_task = self._queued_task_cache.get(queue_cache_entry)['task']
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
original_task._parent = found_task._parent
original_task.from_attrs(task_result._task_fields)
task_result._host = original_host
task_result._task = original_task
# send callbacks for 'non final' results
if '_ansible_retry' in task_result._result:
self._tqm.send_callback('v2_runner_retry', task_result)
continue
elif '_ansible_item_result' in task_result._result:
if task_result.is_failed() or task_result.is_unreachable():
self._tqm.send_callback('v2_runner_item_on_failed', task_result)
elif task_result.is_skipped():
self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
else:
if 'diff' in task_result._result:
if self._diff or getattr(original_task, 'diff', False):
self._tqm.send_callback('v2_on_file_diff', task_result)
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
continue
original_host = task_result._host
original_task = task_result._task
# all host status messages contain 2 entries: (msg, task_result)
role_ran = False

@ -20,7 +20,6 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from units.mock.loader import DictDataLoader
from copy import deepcopy
import uuid
from units.compat import unittest
@ -254,7 +253,7 @@ class TestStrategyBase(unittest.TestCase):
mock_task._parent = None
mock_task.ignore_errors = False
mock_task.ignore_unreachable = False
mock_task._uuid = uuid.uuid4()
mock_task._uuid = str(uuid.uuid4())
mock_task.loop = None
mock_task.copy.return_value = mock_task
@ -319,16 +318,17 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
mock_queued_task_cache = {
(mock_host.name, mock_task._uuid): {
'task': mock_task,
'host': mock_host,
'task_vars': {},
'play_context': {},
def mock_queued_task_cache():
return {
(mock_host.name, mock_task._uuid): {
'task': mock_task,
'host': mock_host,
'task_vars': {},
'play_context': {},
}
}
}
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@ -340,7 +340,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
mock_iterator.is_failed.return_value = True
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@ -354,7 +354,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@ -367,7 +367,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result)
@ -377,7 +377,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)
@ -386,7 +386,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo'))))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)
@ -395,7 +395,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler'])))
strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
strategy_base._queued_task_cache = mock_queued_task_cache()
results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0)

Loading…
Cancel
Save