diff --git a/lib/ansible/executor/play_iterator.py b/lib/ansible/executor/play_iterator.py index 99b7285182a..3f61cc9e03e 100644 --- a/lib/ansible/executor/play_iterator.py +++ b/lib/ansible/executor/play_iterator.py @@ -242,16 +242,10 @@ class PlayIterator: return self._host_states[host.name].copy() def cache_block_tasks(self, block): - def _cache_portion(p): - for t in p: - if isinstance(t, Block): - self.cache_block_tasks(t) - elif t._uuid not in self._task_uuid_cache: - self._task_uuid_cache[t._uuid] = t - - for portion in (block.block, block.rescue, block.always): - if portion is not None: - _cache_portion(portion) + # now a noop, we've changed the way we do caching and finding of + # original task entries, but just in case any 3rd party strategies + # are using this we're leaving it here for now + return def get_next_task_for_host(self, host, peek=False): @@ -520,19 +514,8 @@ class PlayIterator: return self._check_failed_state(s) def get_original_task(self, host, task): - ''' - Finds the task in the task list which matches the UUID of the given task. - The executor engine serializes/deserializes objects as they are passed through - the different processes, and not all data structures are preserved. This method - allows us to find the original task passed into the executor engine. - ''' - - if isinstance(task, Task): - the_uuid = task._uuid - else: - the_uuid = task - - return self._task_uuid_cache.get(the_uuid, None) + # now a noop because we've changed the way we do caching + return (None, None) def _insert_tasks_into_state(self, state, task_list): # if we've failed at all, or if the task list is empty, just return the current state @@ -569,6 +552,4 @@ class PlayIterator: return state def add_tasks(self, host, task_list): - for b in task_list: - self.cache_block_tasks(b) self._host_states[host.name] = self._insert_tasks_into_state(self.get_host_state(host), task_list) diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 4eb08901b80..7b5d3348a8e 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -120,9 +120,9 @@ def debug_closure(func): for result in results: task = result._task host = result._host - _queue_task_args = self._queue_task_args.pop('%s%s' % (host.name, task._uuid)) - task_vars = _queue_task_args['task_vars'] - play_context = _queue_task_args['play_context'] + _queued_task_args = self._queued_task_cache.pop((host.name, task._uuid), None) + task_vars = _queued_task_args['task_vars'] + play_context = _queued_task_args['play_context'] # Try to grab the previous host state, if it doesn't exist use get_host_state to generate an empty state try: prev_host_state = prev_host_states[host.name] @@ -179,7 +179,11 @@ class StrategyBase: self._final_q = tqm._final_q self._step = getattr(tqm._options, 'step', False) self._diff = getattr(tqm._options, 'diff', False) - self._queue_task_args = {} + + # the task cache is a dictionary of tuples of (host.name, task._uuid) + # used to find the original task object of in-flight tasks and to store + # the task args/vars and play context info used to queue the task. + self._queued_task_cache = {} # Backwards compat: self._display isn't really needed, just import the global display and use that. self._display = display @@ -270,13 +274,6 @@ class StrategyBase: def _queue_task(self, host, task, task_vars, play_context): ''' handles queueing the task up to be sent to a worker ''' - self._queue_task_args['%s%s' % (host.name, task._uuid)] = { - 'host': host, - 'task': task, - 'task_vars': task_vars, - 'play_context': play_context - } - display.debug("entering _queue_task() for %s/%s" % (host.name, task.action)) # Add a write lock for tasks. @@ -306,6 +303,13 @@ class StrategyBase: while True: (worker_prc, rslt_q) = self._workers[self._cur_worker] if worker_prc is None or not worker_prc.is_alive(): + self._queued_task_cache[(host.name, task._uuid)] = { + 'host': host, + 'task': task, + 'task_vars': task_vars, + 'play_context': play_context + } + worker_prc = WorkerProcess(self._final_q, task_vars, host, task, play_context, self._loader, self._variable_manager, shared_loader_obj) self._workers[self._cur_worker][0] = worker_prc worker_prc.start() @@ -425,7 +429,8 @@ class StrategyBase: # get the original host and task. We then assign them to the TaskResult for use in callbacks/etc. original_host = get_original_host(task_result._host) - found_task = iterator.get_original_task(original_host, task_result._task) + queue_cache_entry = (original_host.name, task_result._task) + found_task = self._queued_task_cache.get(queue_cache_entry)['task'] original_task = found_task.copy(exclude_parent=True, exclude_tasks=True) original_task._parent = found_task._parent original_task.from_attrs(task_result._task_fields) @@ -854,8 +859,6 @@ class StrategyBase: host_results = [] for host in notified_hosts: if not handler.has_triggered(host) and (not iterator.is_failed(host) or play_context.force_handlers): - if handler._uuid not in iterator._task_uuid_cache: - iterator._task_uuid_cache[handler._uuid] = handler task_vars = self._variable_manager.get_vars(play=iterator._play, host=host, task=handler) self.add_tqm_variables(task_vars, play=iterator._play) self._queue_task(host, handler, task_vars, play_context)