Cache tasks as they are queued instead of en masse (#34752)

* Cache tasks as they are queued instead of en masse

This also moves the task caching from the PlayIterator to the
StrategyBase class, where it makes more sense (and makes it easier
to not have to change the strategy class methods leading to an API
change).

Fixes #31673

* Cleaning up unit tests due to 502ca780
pull/26105/head
James Cammarata 7 years ago committed by GitHub
parent b10d5f34ea
commit b107e397cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -153,8 +153,6 @@ class PlayIterator:
self._blocks = [] self._blocks = []
self._variable_manager = variable_manager self._variable_manager = variable_manager
self._task_uuid_cache = dict()
# Default options to gather # Default options to gather
gather_subset = play_context.gather_subset gather_subset = play_context.gather_subset
gather_timeout = play_context.gather_timeout gather_timeout = play_context.gather_timeout
@ -242,16 +240,10 @@ class PlayIterator:
return self._host_states[host.name].copy() return self._host_states[host.name].copy()
def cache_block_tasks(self, block): def cache_block_tasks(self, block):
def _cache_portion(p): # now a noop, we've changed the way we do caching and finding of
for t in p: # original task entries, but just in case any 3rd party strategies
if isinstance(t, Block): # are using this we're leaving it here for now
self.cache_block_tasks(t) return
elif t._uuid not in self._task_uuid_cache:
self._task_uuid_cache[t._uuid] = t
for portion in (block.block, block.rescue, block.always):
if portion is not None:
_cache_portion(portion)
def get_next_task_for_host(self, host, peek=False): def get_next_task_for_host(self, host, peek=False):
@ -520,19 +512,8 @@ class PlayIterator:
return self._check_failed_state(s) return self._check_failed_state(s)
def get_original_task(self, host, task): def get_original_task(self, host, task):
''' # now a noop because we've changed the way we do caching
Finds the task in the task list which matches the UUID of the given task. return (None, None)
The executor engine serializes/deserializes objects as they are passed through
the different processes, and not all data structures are preserved. This method
allows us to find the original task passed into the executor engine.
'''
if isinstance(task, Task):
the_uuid = task._uuid
else:
the_uuid = task
return self._task_uuid_cache.get(the_uuid, None)
def _insert_tasks_into_state(self, state, task_list): def _insert_tasks_into_state(self, state, task_list):
# if we've failed at all, or if the task list is empty, just return the current state # if we've failed at all, or if the task list is empty, just return the current state
@ -569,6 +550,4 @@ class PlayIterator:
return state return state
def add_tasks(self, host, task_list): def add_tasks(self, host, task_list):
for b in task_list:
self.cache_block_tasks(b)
self._host_states[host.name] = self._insert_tasks_into_state(self.get_host_state(host), task_list) self._host_states[host.name] = self._insert_tasks_into_state(self.get_host_state(host), task_list)

@ -120,9 +120,9 @@ def debug_closure(func):
for result in results: for result in results:
task = result._task task = result._task
host = result._host host = result._host
_queue_task_args = self._queue_task_args.pop('%s%s' % (host.name, task._uuid)) _queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None)
task_vars = _queue_task_args['task_vars'] task_vars = _queued_task_args['task_vars']
play_context = _queue_task_args['play_context'] play_context = _queued_task_args['play_context']
# Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state # Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state
try: try:
prev_host_state = prev_host_states[host.name] prev_host_state = prev_host_states[host.name]
@ -179,7 +179,11 @@ class StrategyBase:
self._final_q = tqm._final_q self._final_q = tqm._final_q
self._step = getattr(tqm._options, 'step', False) self._step = getattr(tqm._options, 'step', False)
self._diff = getattr(tqm._options, 'diff', False) self._diff = getattr(tqm._options, 'diff', False)
self._queue_task_args = {}
# the task cache is a dictionary of tuples of (host.name, task._uuid)
# used to find the original task object of in-flight tasks and to store
# the task args/vars and play context info used to queue the task.
self._queued_task_cache = {}
# Backwards compat: self._display isn't really needed, just import the global display and use that. # Backwards compat: self._display isn't really needed, just import the global display and use that.
self._display = display self._display = display
@ -270,13 +274,6 @@ class StrategyBase:
def _queue_task(self, host, task, task_vars, play_context): def _queue_task(self, host, task, task_vars, play_context):
''' handles queueing the task up to be sent to a worker ''' ''' handles queueing the task up to be sent to a worker '''
self._queue_task_args['%s%s' % (host.name, task._uuid)] = {
'host': host,
'task': task,
'task_vars': task_vars,
'play_context': play_context
}
display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) display.debug("entering _queue_task() for %s/%s" % (host.name, task.action))
# Add a write lock for tasks. # Add a write lock for tasks.
@ -306,6 +303,13 @@ class StrategyBase:
while True: while True:
(worker_prc, rslt_q) = self._workers[self._cur_worker] (worker_prc, rslt_q) = self._workers[self._cur_worker]
if worker_prc is None or not worker_prc.is_alive(): if worker_prc is None or not worker_prc.is_alive():
self._queued_task_cache[(host.name, task._uuid)] = {
'host': host,
'task': task,
'task_vars': task_vars,
'play_context': play_context
}
worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj)
self._workers[self._cur_worker][0] = worker_prc self._workers[self._cur_worker][0] = worker_prc
worker_prc.start() worker_prc.start()
@ -425,7 +429,8 @@ class StrategyBase:
# get the original host and task. We then assign them to the TaskResult for use in callbacks/etc. # get the original host and task. We then assign them to the TaskResult for use in callbacks/etc.
original_host = get_original_host(task_result._host) original_host = get_original_host(task_result._host)
found_task = iterator.get_original_task(original_host, task_result._task) queue_cache_entry = (original_host.name, task_result._task)
found_task = self._queued_task_cache.get(queue_cache_entry)['task']
original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) original_task = found_task.copy(exclude_parent=True, exclude_tasks=True)
original_task._parent = found_task._parent original_task._parent = found_task._parent
original_task.from_attrs(task_result._task_fields) original_task.from_attrs(task_result._task_fields)
@ -854,8 +859,6 @@ class StrategyBase:
host_results = [] host_results = []
for host in notified_hosts: for host in notified_hosts:
if not handler.has_triggered(host) and (not iterator.is_failed(host) or play_context.force_handlers): if not handler.has_triggered(host) and (not iterator.is_failed(host) or play_context.force_handlers):
if handler._uuid not in iterator._task_uuid_cache:
iterator._task_uuid_cache[handler._uuid] = handler
task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=handler) task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=handler)
self.add_tqm_variables(task_vars, play=iterator._play) self.add_tqm_variables(task_vars, play=iterator._play)
self._queue_task(host, handler, task_vars, play_context) self._queue_task(host, handler, task_vars, play_context)

@ -155,16 +155,6 @@ class TestPlayIterator(unittest.TestCase):
all_vars=dict(), all_vars=dict(),
) )
# lookup up an original task
target_task = p._entries[0].tasks[0].block[0]
task_copy = target_task.copy(exclude_parent=True)
found_task = itr.get_original_task(hosts[0], task_copy)
self.assertEqual(target_task, found_task)
bad_task = Task()
found_task = itr.get_original_task(hosts[0], bad_task)
self.assertIsNone(found_task)
# pre task # pre task
(host_state, task) = itr.get_next_task_for_host(hosts[0]) (host_state, task) = itr.get_next_task_for_host(hosts[0])
self.assertIsNotNone(task) self.assertIsNotNone(task)

@ -20,6 +20,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
from units.mock.loader import DictDataLoader from units.mock.loader import DictDataLoader
from copy import deepcopy
import uuid import uuid
from ansible.compat.tests import unittest from ansible.compat.tests import unittest
@ -207,15 +208,18 @@ class TestStrategyBase(unittest.TestCase):
tqm._initialize_processes(3) tqm._initialize_processes(3)
tqm.hostvars = dict() tqm.hostvars = dict()
mock_task = MagicMock()
mock_task._uuid = 'abcd'
try: try:
strategy_base = StrategyBase(tqm=tqm) strategy_base = StrategyBase(tqm=tqm)
strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 1) self.assertEqual(strategy_base._cur_worker, 1)
self.assertEqual(strategy_base._pending_results, 1) self.assertEqual(strategy_base._pending_results, 1)
strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 2) self.assertEqual(strategy_base._cur_worker, 2)
self.assertEqual(strategy_base._pending_results, 2) self.assertEqual(strategy_base._pending_results, 2)
strategy_base._queue_task(host=mock_host, task=MagicMock(), task_vars=dict(), play_context=MagicMock()) strategy_base._queue_task(host=mock_host, task=mock_task, task_vars=dict(), play_context=MagicMock())
self.assertEqual(strategy_base._cur_worker, 0) self.assertEqual(strategy_base._cur_worker, 0)
self.assertEqual(strategy_base._pending_results, 3) self.assertEqual(strategy_base._pending_results, 3)
finally: finally:
@ -282,7 +286,6 @@ class TestStrategyBase(unittest.TestCase):
mock_iterator._play = mock_play mock_iterator._play = mock_play
mock_iterator.mark_host_failed.return_value = None mock_iterator.mark_host_failed.return_value = None
mock_iterator.get_next_task_for_host.return_value = (None, None) mock_iterator.get_next_task_for_host.return_value = (None, None)
mock_iterator.get_original_task.return_value = mock_task
mock_handler_block = MagicMock() mock_handler_block = MagicMock()
mock_handler_block.block = [mock_handler_task] mock_handler_block.block = [mock_handler_task]
@ -337,12 +340,16 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
strategy_base._queue_task_args = MagicMock() mock_queued_task_cache = {
strategy_base._queue_task_args.pop.return_value = { (mock_host.name, mock_task._uuid): {
'task_vars': {}, 'task': mock_task,
'play_context': {}, 'host': mock_host,
'task_vars': {},
'play_context': {},
}
} }
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result) self.assertEqual(results[0], task_result)
@ -354,6 +361,7 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
mock_iterator.is_failed.return_value = True mock_iterator.is_failed.return_value = True
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result) self.assertEqual(results[0], task_result)
@ -367,6 +375,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(task_result) queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result) self.assertEqual(results[0], task_result)
@ -379,6 +388,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(task_result) queue_items.append(task_result)
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0], task_result) self.assertEqual(results[0], task_result)
@ -388,6 +398,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo'])))) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_host=dict(host_name='newhost01', new_groups=['foo']))))
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
@ -396,6 +407,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo')))) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(add_group=dict(group_name='foo'))))
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
@ -404,6 +416,7 @@ class TestStrategyBase(unittest.TestCase):
queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler']))) queue_items.append(TaskResult(host=mock_host.name, task=mock_task._uuid, return_data=dict(changed=True, _ansible_notify=['test handler'])))
strategy_base._blocked_hosts['test01'] = True strategy_base._blocked_hosts['test01'] = True
strategy_base._pending_results = 1 strategy_base._pending_results = 1
strategy_base._queued_task_cache = deepcopy(mock_queued_task_cache)
results = strategy_base._wait_on_pending_results(iterator=mock_iterator) results = strategy_base._wait_on_pending_results(iterator=mock_iterator)
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(strategy_base._pending_results, 0) self.assertEqual(strategy_base._pending_results, 0)
@ -501,6 +514,7 @@ class TestStrategyBase(unittest.TestCase):
mock_handler_task._role = None mock_handler_task._role = None
mock_handler_task._parent = None mock_handler_task._parent = None
mock_handler_task._uuid = 'xxxxxxxxxxxxxxxx' mock_handler_task._uuid = 'xxxxxxxxxxxxxxxx'
mock_handler_task.copy.return_value = mock_handler_task
mock_handler = MagicMock() mock_handler = MagicMock()
mock_handler.block = [mock_handler_task] mock_handler.block = [mock_handler_task]
@ -516,13 +530,13 @@ class TestStrategyBase(unittest.TestCase):
mock_inventory = MagicMock() mock_inventory = MagicMock()
mock_inventory.get_hosts.return_value = [mock_host] mock_inventory.get_hosts.return_value = [mock_host]
mock_inventory.get.return_value = mock_host mock_inventory.get.return_value = mock_host
mock_inventory.get_host.return_value = mock_host
mock_var_mgr = MagicMock() mock_var_mgr = MagicMock()
mock_var_mgr.get_vars.return_value = dict() mock_var_mgr.get_vars.return_value = dict()
mock_iterator = MagicMock() mock_iterator = MagicMock()
mock_iterator._play = mock_play mock_iterator._play = mock_play
mock_iterator.get_original_task.return_value = mock_handler_task
fake_loader = DictDataLoader() fake_loader = DictDataLoader()
mock_options = MagicMock() mock_options = MagicMock()
@ -545,9 +559,11 @@ class TestStrategyBase(unittest.TestCase):
strategy_base._inventory = mock_inventory strategy_base._inventory = mock_inventory
strategy_base._notified_handlers = {mock_handler_task._uuid: [mock_host]} strategy_base._notified_handlers = {mock_handler_task._uuid: [mock_host]}
task_result = TaskResult(Host('host01'), Handler(), dict(changed=False)) task_result = TaskResult(mock_host.name, mock_handler_task._uuid, dict(changed=False))
strategy_base._queue_task_args = MagicMock() strategy_base._queued_task_cache = dict()
strategy_base._queue_task_args.pop.return_value = { strategy_base._queued_task_cache[(mock_host.name, mock_handler_task._uuid)] = {
'task': mock_handler_task,
'host': mock_host,
'task_vars': {}, 'task_vars': {},
'play_context': mock_play_context 'play_context': mock_play_context
} }

Loading…
Cancel
Save