From f24a6a93cb4a6cf3ed7c5abdbdd5d8c5a97ad9c7 Mon Sep 17 00:00:00 2001 From: Martin Krizek Date: Wed, 2 Oct 2024 12:25:15 +0200 Subject: [PATCH] Move processing includes into StrategyBase ci_complete --- lib/ansible/playbook/included_file.py | 283 +++++++++++------------ lib/ansible/plugins/strategy/__init__.py | 124 ++++++++-- lib/ansible/plugins/strategy/free.py | 89 +------ lib/ansible/plugins/strategy/linear.py | 117 +--------- lib/ansible/utils/vars.py | 18 ++ 5 files changed, 271 insertions(+), 360 deletions(-) diff --git a/lib/ansible/playbook/included_file.py b/lib/ansible/playbook/included_file.py index d2fdb76364d..e3fafa7aacf 100644 --- a/lib/ansible/playbook/included_file.py +++ b/lib/ansible/playbook/included_file.py @@ -28,6 +28,7 @@ from ansible.playbook.task_include import TaskInclude from ansible.playbook.role_include import IncludeRole from ansible.template import Templar from ansible.utils.display import Display +from ansible.utils.vars import get_loop_item_vars display = Display() @@ -65,161 +66,147 @@ class IncludedFile: task_vars_cache = {} for res in results: - original_host = res._host original_task = res._task - if original_task.action in C._ACTION_ALL_INCLUDES: - - if original_task.loop: - if 'results' not in res._result: - continue - include_results = res._result['results'] - else: - include_results = [res._result] + if original_task.action not in C._ACTION_ALL_INCLUDES: + continue + + if original_task.loop: + include_results = res._result.get('results', []) + else: + include_results = [res._result] + + for include_result in include_results: + if include_result.get('skipped') or include_result.get('failed'): + continue + + cache_key = (iterator._play, original_host, original_task) + try: + task_vars = task_vars_cache[cache_key] + except KeyError: + task_vars = task_vars_cache[cache_key] = variable_manager.get_vars(play=iterator._play, host=original_host, task=original_task) + + special_vars = get_loop_item_vars(include_result, original_task) + task_vars.update(special_vars) + + include_args = include_result.pop('include_args', dict()) + if original_task.no_log and '_ansible_no_log' not in include_args: + task_vars['_ansible_no_log'] = special_vars['_ansible_no_log'] = original_task.no_log + + # get search path for this task to pass to lookup plugins that may be used in pathing to + # the included file + task_vars['ansible_search_path'] = original_task.get_search_path() + + # ensure basedir is always in (dwim already searches here but we need to display it) + if loader.get_basedir() not in task_vars['ansible_search_path']: + task_vars['ansible_search_path'].append(loader.get_basedir()) + + templar = Templar(loader=loader, variables=task_vars) + + if original_task.action in C._ACTION_INCLUDE_TASKS: + include_file = None + + if original_task._parent: + # handle relative includes by walking up the list of parent include + # tasks and checking the relative result to see if it exists + parent_include = original_task._parent + cumulative_path = None + while parent_include is not None: + if not isinstance(parent_include, TaskInclude): + parent_include = parent_include._parent + continue + if isinstance(parent_include, IncludeRole): + parent_include_dir = parent_include._role_path + else: + try: + parent_include_dir = os.path.dirname(templar.template(parent_include.args.get('_raw_params'))) + except AnsibleError as e: + parent_include_dir = '' + display.warning( + 'Templating the path of the parent %s failed. The path to the ' + 'included file may not be found. ' + 'The error was: %s.' % (original_task.action, to_text(e)) + ) + if cumulative_path is not None and not os.path.isabs(cumulative_path): + cumulative_path = os.path.join(parent_include_dir, cumulative_path) + else: + cumulative_path = parent_include_dir + include_target = templar.template(include_result['include']) + if original_task._role: + dirname = 'handlers' if isinstance(original_task, Handler) else 'tasks' + new_basedir = os.path.join(original_task._role._role_path, dirname, cumulative_path) + candidates = [ + loader.path_dwim_relative(original_task._role._role_path, dirname, include_target, is_role=True), + loader.path_dwim_relative(new_basedir, dirname, include_target, is_role=True) + ] + for include_file in candidates: + try: + # may throw OSError + os.stat(include_file) + # or select the task file if it exists + break + except OSError: + pass + else: + include_file = loader.path_dwim_relative(loader.get_basedir(), cumulative_path, include_target) - for include_result in include_results: - # if the task result was skipped or failed, continue - if 'skipped' in include_result and include_result['skipped'] or 'failed' in include_result and include_result['failed']: - continue + if os.path.exists(include_file): + break + else: + parent_include = parent_include._parent + + if include_file is None: + if original_task._role: + include_target = templar.template(include_result['include']) + include_file = loader.path_dwim_relative( + original_task._role._role_path, + 'handlers' if isinstance(original_task, Handler) else 'tasks', + include_target, + is_role=True) + else: + include_file = loader.path_dwim(include_result['include']) - cache_key = (iterator._play, original_host, original_task) + include_file = templar.template(include_file) + inc_file = IncludedFile(include_file, include_args, special_vars, original_task) + else: + # template the included role's name here + role_name = include_args.pop('name', include_args.pop('role', None)) + if role_name is not None: + role_name = templar.template(role_name) + + new_task = original_task.copy() + new_task.post_validate(templar=templar) + new_task._role_name = role_name + for from_arg in new_task.FROM_ARGS: + if from_arg in include_args: + from_key = from_arg.removesuffix('_from') + new_task._from_files[from_key] = templar.template(include_args.pop(from_arg)) + + if omit_token := task_vars.get('omit'): + new_task._from_files = remove_omit(new_task._from_files, omit_token) + + inc_file = IncludedFile(role_name, include_args, special_vars, new_task, is_role=True) + + idx = 0 + orig_inc_file = inc_file + while 1: try: - task_vars = task_vars_cache[cache_key] - except KeyError: - task_vars = task_vars_cache[cache_key] = variable_manager.get_vars(play=iterator._play, host=original_host, task=original_task) - - include_args = include_result.pop('include_args', dict()) - special_vars = {} - loop_var = include_result.get('ansible_loop_var', 'item') - index_var = include_result.get('ansible_index_var') - if loop_var in include_result: - task_vars[loop_var] = special_vars[loop_var] = include_result[loop_var] - task_vars['ansible_loop_var'] = special_vars['ansible_loop_var'] = loop_var - if index_var and index_var in include_result: - task_vars[index_var] = special_vars[index_var] = include_result[index_var] - task_vars['ansible_index_var'] = special_vars['ansible_index_var'] = index_var - if '_ansible_item_label' in include_result: - task_vars['_ansible_item_label'] = special_vars['_ansible_item_label'] = include_result['_ansible_item_label'] - if 'ansible_loop' in include_result: - task_vars['ansible_loop'] = special_vars['ansible_loop'] = include_result['ansible_loop'] - if original_task.no_log and '_ansible_no_log' not in include_args: - task_vars['_ansible_no_log'] = special_vars['_ansible_no_log'] = original_task.no_log - - # get search path for this task to pass to lookup plugins that may be used in pathing to - # the included file - task_vars['ansible_search_path'] = original_task.get_search_path() - - # ensure basedir is always in (dwim already searches here but we need to display it) - if loader.get_basedir() not in task_vars['ansible_search_path']: - task_vars['ansible_search_path'].append(loader.get_basedir()) - - templar = Templar(loader=loader, variables=task_vars) - - if original_task.action in C._ACTION_INCLUDE_TASKS: - include_file = None - - if original_task._parent: - # handle relative includes by walking up the list of parent include - # tasks and checking the relative result to see if it exists - parent_include = original_task._parent - cumulative_path = None - while parent_include is not None: - if not isinstance(parent_include, TaskInclude): - parent_include = parent_include._parent - continue - if isinstance(parent_include, IncludeRole): - parent_include_dir = parent_include._role_path - else: - try: - parent_include_dir = os.path.dirname(templar.template(parent_include.args.get('_raw_params'))) - except AnsibleError as e: - parent_include_dir = '' - display.warning( - 'Templating the path of the parent %s failed. The path to the ' - 'included file may not be found. ' - 'The error was: %s.' % (original_task.action, to_text(e)) - ) - if cumulative_path is not None and not os.path.isabs(cumulative_path): - cumulative_path = os.path.join(parent_include_dir, cumulative_path) - else: - cumulative_path = parent_include_dir - include_target = templar.template(include_result['include']) - if original_task._role: - dirname = 'handlers' if isinstance(original_task, Handler) else 'tasks' - new_basedir = os.path.join(original_task._role._role_path, dirname, cumulative_path) - candidates = [ - loader.path_dwim_relative(original_task._role._role_path, dirname, include_target, is_role=True), - loader.path_dwim_relative(new_basedir, dirname, include_target, is_role=True) - ] - for include_file in candidates: - try: - # may throw OSError - os.stat(include_file) - # or select the task file if it exists - break - except OSError: - pass - else: - include_file = loader.path_dwim_relative(loader.get_basedir(), cumulative_path, include_target) - - if os.path.exists(include_file): - break - else: - parent_include = parent_include._parent - - if include_file is None: - if original_task._role: - include_target = templar.template(include_result['include']) - include_file = loader.path_dwim_relative( - original_task._role._role_path, - 'handlers' if isinstance(original_task, Handler) else 'tasks', - include_target, - is_role=True) - else: - include_file = loader.path_dwim(include_result['include']) + pos = included_files[idx:].index(orig_inc_file) + # pos is relative to idx since we are slicing + # use idx + pos due to relative indexing + inc_file = included_files[idx + pos] + except ValueError: + included_files.append(orig_inc_file) + inc_file = orig_inc_file - include_file = templar.template(include_file) - inc_file = IncludedFile(include_file, include_args, special_vars, original_task) + try: + inc_file.add_host(original_host) + inc_file._results.append(res) + except ValueError: + # The host already exists for this include, advance forward, this is a new include + idx += pos + 1 else: - # template the included role's name here - role_name = include_args.pop('name', include_args.pop('role', None)) - if role_name is not None: - role_name = templar.template(role_name) - - new_task = original_task.copy() - new_task.post_validate(templar=templar) - new_task._role_name = role_name - for from_arg in new_task.FROM_ARGS: - if from_arg in include_args: - from_key = from_arg.removesuffix('_from') - new_task._from_files[from_key] = templar.template(include_args.pop(from_arg)) - - omit_token = task_vars.get('omit') - if omit_token: - new_task._from_files = remove_omit(new_task._from_files, omit_token) - - inc_file = IncludedFile(role_name, include_args, special_vars, new_task, is_role=True) - - idx = 0 - orig_inc_file = inc_file - while 1: - try: - pos = included_files[idx:].index(orig_inc_file) - # pos is relative to idx since we are slicing - # use idx + pos due to relative indexing - inc_file = included_files[idx + pos] - except ValueError: - included_files.append(orig_inc_file) - inc_file = orig_inc_file - - try: - inc_file.add_host(original_host) - inc_file._results.append(res) - except ValueError: - # The host already exists for this include, advance forward, this is a new include - idx += pos + 1 - else: - break + break return included_files diff --git a/lib/ansible/plugins/strategy/__init__.py b/lib/ansible/plugins/strategy/__init__.py index 3eb5538b96d..1b027888b58 100644 --- a/lib/ansible/plugins/strategy/__init__.py +++ b/lib/ansible/plugins/strategy/__init__.py @@ -18,6 +18,7 @@ from __future__ import annotations import cmd +import collections import functools import os import pprint @@ -36,7 +37,7 @@ from ansible import constants as C from ansible import context from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleUndefinedVariable, AnsibleParserError from ansible.executor import action_write_locks -from ansible.executor.play_iterator import IteratingStates, PlayIterator +from ansible.executor.play_iterator import IteratingStates from ansible.executor.process.worker import WorkerProcess from ansible.executor.task_result import TaskResult from ansible.executor.task_queue_manager import CallbackSend, DisplaySend, PromptSend @@ -47,6 +48,7 @@ from ansible.module_utils.connection import Connection, ConnectionError from ansible.playbook.conditional import Conditional from ansible.playbook.handler import Handler from ansible.playbook.helpers import load_list_of_blocks +from ansible.playbook.included_file import IncludedFile from ansible.playbook.task import Task from ansible.playbook.task_include import TaskInclude from ansible.plugins import loader as plugin_loader @@ -54,9 +56,12 @@ from ansible.template import Templar from ansible.utils.display import Display from ansible.utils.fqcn import add_internal_fqcns from ansible.utils.unsafe_proxy import wrap_var -from ansible.utils.vars import combine_vars +from ansible.utils.vars import combine_vars, get_loop_item_vars from ansible.vars.clean import strip_internal_keys, module_response_deepcopy +if t.TYPE_CHECKING: + from ansible.executor.play_iterator import PlayIterator + display = Display() __all__ = ['StrategyBase'] @@ -92,22 +97,6 @@ def post_process_whens(result, task, templar, task_vars): result['failed_when_result'] = result['failed'] = failed_when_result -def _get_item_vars(result, task): - item_vars = {} - if task.loop or task.loop_with: - loop_var = result.get('ansible_loop_var', 'item') - index_var = result.get('ansible_index_var') - if loop_var in result: - item_vars[loop_var] = result[loop_var] - if index_var and index_var in result: - item_vars[index_var] = result[index_var] - if '_ansible_item_label' in result: - item_vars['_ansible_item_label'] = result['_ansible_item_label'] - if 'ansible_loop' in result: - item_vars['ansible_loop'] = result['ansible_loop'] - return item_vars - - def results_thread_main(strategy): while True: try: @@ -685,7 +674,7 @@ class StrategyBase: self._inventory.add_dynamic_group(original_host, result_item) if 'add_host' in result_item or 'add_group' in result_item: - item_vars = _get_item_vars(result_item, original_task) + item_vars = get_loop_item_vars(result_item, original_task) found_task_vars = self._queued_task_cache.get((original_host.name, task_result._task._uuid))['task_vars'] if item_vars: all_task_vars = combine_vars(found_task_vars, item_vars) @@ -796,6 +785,103 @@ class StrategyBase: return ret_results + def _process_includes(self, results: list[TaskResult], iterator: PlayIterator) -> None: + if not ( + included_files := IncludedFile.process_include_results( + results=results, + iterator=iterator, + loader=self._loader, + variable_manager=self._variable_manager, + ) + ): + return + + display.debug("we have included files to process") + hosts_left = self.get_hosts_left(iterator) + all_blocks = collections.defaultdict(list) + included_tasks = [] + failed_includes_hosts = set() + for included_file in included_files: + display.debug("processing included file: %s" % included_file._filename) + is_handler = False + try: + if included_file._is_role: + new_ir = self._copy_included_file(included_file) + new_blocks, unused_var = new_ir.get_block_list( + play=iterator._play, + variable_manager=self._variable_manager, + loader=self._loader, + ) + else: + is_handler = isinstance(included_file._task, Handler) + new_blocks = self._load_included_file( + included_file, + iterator=iterator, + is_handler=is_handler, + handle_stats_and_callbacks=False, + ) + + # let PlayIterator know about any new handlers included via include_role or + # import_role within include_role/include_taks + iterator.handlers = [h for b in iterator._play.handlers for h in b.block] + + display.debug("iterating over new_blocks loaded from include file") + for new_block in new_blocks: + if is_handler: + for task in new_block.block: + task.notified_hosts = included_file._hosts[:] + final_block = new_block + else: + task_vars = self._variable_manager.get_vars( + play=iterator._play, + task=new_block.get_first_parent_include(), + _hosts=self._hosts_cache, + _hosts_all=self._hosts_cache_all, + ) + display.debug("filtering new block on tags") + final_block = new_block.filter_tagged_tasks(task_vars) + display.debug("done filtering new block on tags") + + included_tasks.extend(final_block.get_tasks()) + + for host in hosts_left: + if host in included_file._hosts: + all_blocks[host].append(final_block) + + display.debug("done iterating over new_blocks loaded from include file") + except AnsibleParserError: + raise + except AnsibleError as e: + display.error(to_text(e), wrap_text=False) + for r in included_file._results: + r._result['failed'] = True + r._result['reason'] = str(e) + self._tqm._stats.increment('failures', r._host.name) + self._tqm.send_callback('v2_runner_on_failed', r) + failed_includes_hosts.add(r._host) + else: + # since we skip incrementing the stats when the task result is + # first processed, we do so now for each host in the list + for host in included_file._hosts: + self._tqm._stats.increment('ok', host.name) + self._tqm.send_callback('v2_playbook_on_include', included_file) + + for host in failed_includes_hosts: + self._tqm._failed_hosts[host.name] = True + iterator.mark_host_failed(host) + + # finally go through all of the hosts and append the + # accumulated blocks to their list of tasks + display.debug("extending task lists for all hosts with included blocks") + + for host in hosts_left: + iterator.add_tasks(host, all_blocks[host]) + + iterator.all_tasks[iterator.cur_task:iterator.cur_task] = included_tasks + + display.debug("done extending task lists") + display.debug("done processing included files") + def _wait_on_pending_results(self, iterator): """ Wait for the shared counter to drop to zero, using a short sleep diff --git a/lib/ansible/plugins/strategy/free.py b/lib/ansible/plugins/strategy/free.py index 6fca97301bd..80dc7ea3207 100644 --- a/lib/ansible/plugins/strategy/free.py +++ b/lib/ansible/plugins/strategy/free.py @@ -32,9 +32,8 @@ DOCUMENTATION = """ import time from ansible import constants as C -from ansible.errors import AnsibleError, AnsibleParserError +from ansible.errors import AnsibleError from ansible.playbook.handler import Handler -from ansible.playbook.included_file import IncludedFile from ansible.plugins.loader import action_loader from ansible.plugins.strategy import StrategyBase from ansible.template import Templar @@ -102,7 +101,7 @@ class StrategyModule(StrategyBase): host_name = host.get_name() # peek at the next task for the host, to see if there's - # anything to do do for this host + # anything to do for this host (state, task) = iterator.get_next_task_for_host(host, peek=True) display.debug("free host state: %s" % state, host=host_name) display.debug("free host task: %s" % task, host=host_name) @@ -179,7 +178,7 @@ class StrategyModule(StrategyBase): self._execute_meta(task, play_context, iterator, target_host=host) self._blocked_hosts[host_name] = False else: - # handle step if needed, skip meta actions as they are used internally + # handle step if needed if not self._step or self._take_step(task, host_name): if task.any_errors_fatal: display.warning("Using any_errors_fatal with the free strategy is not supported, " @@ -219,91 +218,13 @@ class StrategyModule(StrategyBase): self.update_active_connections(results) - included_files = IncludedFile.process_include_results( - host_results, - iterator=iterator, - loader=self._loader, - variable_manager=self._variable_manager - ) - - if len(included_files) > 0: - all_blocks = dict((host, []) for host in hosts_left) - failed_includes_hosts = set() - for included_file in included_files: - display.debug("collecting new blocks for %s" % included_file) - is_handler = False - try: - if included_file._is_role: - new_ir = self._copy_included_file(included_file) - - new_blocks, handler_blocks = new_ir.get_block_list( - play=iterator._play, - variable_manager=self._variable_manager, - loader=self._loader, - ) - else: - is_handler = isinstance(included_file._task, Handler) - new_blocks = self._load_included_file( - included_file, - iterator=iterator, - is_handler=is_handler, - handle_stats_and_callbacks=False, - ) - - # let PlayIterator know about any new handlers included via include_role or - # import_role within include_role/include_taks - iterator.handlers = [h for b in iterator._play.handlers for h in b.block] - except AnsibleParserError: - raise - except AnsibleError as e: - display.error(to_text(e), wrap_text=False) - for r in included_file._results: - r._result['failed'] = True - r._result['reason'] = str(e) - self._tqm._stats.increment('failures', r._host.name) - self._tqm.send_callback('v2_runner_on_failed', r) - failed_includes_hosts.add(r._host) - continue - else: - # since we skip incrementing the stats when the task result is - # first processed, we do so now for each host in the list - for host in included_file._hosts: - self._tqm._stats.increment('ok', host.name) - self._tqm.send_callback('v2_playbook_on_include', included_file) - - for new_block in new_blocks: - if is_handler: - for task in new_block.block: - task.notified_hosts = included_file._hosts[:] - final_block = new_block - else: - task_vars = self._variable_manager.get_vars( - play=iterator._play, - task=new_block.get_first_parent_include(), - _hosts=self._hosts_cache, - _hosts_all=self._hosts_cache_all, - ) - final_block = new_block.filter_tagged_tasks(task_vars) - for host in hosts_left: - if host in included_file._hosts: - all_blocks[host].append(final_block) - display.debug("done collecting new blocks for %s" % included_file) - - for host in failed_includes_hosts: - self._tqm._failed_hosts[host.name] = True - iterator.mark_host_failed(host) - - display.debug("adding all collected blocks from %d included file(s) to iterator" % len(included_files)) - for host in hosts_left: - iterator.add_tasks(host, all_blocks[host]) - display.debug("done adding collected blocks to iterator") + self._process_includes(results, iterator) # pause briefly so we don't spin lock time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) # collect all the final results - results = self._wait_on_pending_results(iterator) + self._wait_on_pending_results(iterator) # run the base class run() method, which executes the cleanup function - # and runs any outstanding handlers which have been triggered return super(StrategyModule, self).run(iterator, play_context, result) diff --git a/lib/ansible/plugins/strategy/linear.py b/lib/ansible/plugins/strategy/linear.py index 372a05f0e1a..43b2742f1ca 100644 --- a/lib/ansible/plugins/strategy/linear.py +++ b/lib/ansible/plugins/strategy/linear.py @@ -30,10 +30,9 @@ DOCUMENTATION = """ """ from ansible import constants as C -from ansible.errors import AnsibleError, AnsibleAssertionError, AnsibleParserError +from ansible.errors import AnsibleAssertionError from ansible.module_utils.common.text.converters import to_text from ansible.playbook.handler import Handler -from ansible.playbook.included_file import IncludedFile from ansible.plugins.loader import action_loader from ansible.plugins.strategy import StrategyBase from ansible.template import Templar @@ -137,10 +136,6 @@ class StrategyModule(StrategyBase): templar = Templar(loader=self._loader, variables=task_vars) display.debug("done getting variables") - # test to see if the task across all hosts points to an action plugin which - # sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we - # will only send this task to the first host in the list. - task_action = templar.template(task.action) try: @@ -159,7 +154,7 @@ class StrategyModule(StrategyBase): if (task.any_errors_fatal or run_once) and not task.ignore_errors: any_errors_fatal = True else: - # handle step if needed, skip meta actions as they are used internally + # handle step if needed if self._step and choose_step: if self._take_step(task): choose_step = False @@ -167,11 +162,14 @@ class StrategyModule(StrategyBase): skip_rest = True break + # test to see if the task across all hosts points to an action plugin which + # sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we + # will only send this task to the first host in the list. run_once = action and getattr(action, 'BYPASS_HOST_LOOP', False) or templar.template(task.run_once) try: task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty') except Exception as e: - display.debug(f"Failed to templalte task name ({task.name}), ignoring error and continuing: {e}") + display.debug(f"Failed to template task name ({task.name}), ignoring error and continuing: {e}") if (task.any_errors_fatal or run_once) and not task.ignore_errors: any_errors_fatal = True @@ -206,108 +204,11 @@ class StrategyModule(StrategyBase): display.debug("done queuing things up, now waiting for results queue to drain") if self._pending_results > 0: results.extend(self._wait_on_pending_results(iterator)) - self.update_active_connections(results) - - included_files = IncludedFile.process_include_results( - results, - iterator=iterator, - loader=self._loader, - variable_manager=self._variable_manager - ) - - if len(included_files) > 0: - display.debug("we have included files to process") - - display.debug("generating all_blocks data") - all_blocks = dict((host, []) for host in hosts_left) - display.debug("done generating all_blocks data") - included_tasks = [] - failed_includes_hosts = set() - for included_file in included_files: - display.debug("processing included file: %s" % included_file._filename) - is_handler = False - try: - if included_file._is_role: - new_ir = self._copy_included_file(included_file) - - new_blocks, handler_blocks = new_ir.get_block_list( - play=iterator._play, - variable_manager=self._variable_manager, - loader=self._loader, - ) - else: - is_handler = isinstance(included_file._task, Handler) - new_blocks = self._load_included_file( - included_file, - iterator=iterator, - is_handler=is_handler, - handle_stats_and_callbacks=False, - ) - - # let PlayIterator know about any new handlers included via include_role or - # import_role within include_role/include_taks - iterator.handlers = [h for b in iterator._play.handlers for h in b.block] - - display.debug("iterating over new_blocks loaded from include file") - for new_block in new_blocks: - if is_handler: - for task in new_block.block: - task.notified_hosts = included_file._hosts[:] - final_block = new_block - else: - task_vars = self._variable_manager.get_vars( - play=iterator._play, - task=new_block.get_first_parent_include(), - _hosts=self._hosts_cache, - _hosts_all=self._hosts_cache_all, - ) - display.debug("filtering new block on tags") - final_block = new_block.filter_tagged_tasks(task_vars) - display.debug("done filtering new block on tags") - - included_tasks.extend(final_block.get_tasks()) - - for host in hosts_left: - if host in included_file._hosts: - all_blocks[host].append(final_block) - - display.debug("done iterating over new_blocks loaded from include file") - except AnsibleParserError: - raise - except AnsibleError as e: - display.error(to_text(e), wrap_text=False) - for r in included_file._results: - r._result['failed'] = True - r._result['reason'] = str(e) - self._tqm._stats.increment('failures', r._host.name) - self._tqm.send_callback('v2_runner_on_failed', r) - failed_includes_hosts.add(r._host) - else: - # since we skip incrementing the stats when the task result is - # first processed, we do so now for each host in the list - for host in included_file._hosts: - self._tqm._stats.increment('ok', host.name) - self._tqm.send_callback('v2_playbook_on_include', included_file) - - for host in failed_includes_hosts: - self._tqm._failed_hosts[host.name] = True - iterator.mark_host_failed(host) - - # finally go through all of the hosts and append the - # accumulated blocks to their list of tasks - display.debug("extending task lists for all hosts with included blocks") - - for host in hosts_left: - iterator.add_tasks(host, all_blocks[host]) - - iterator.all_tasks[iterator.cur_task:iterator.cur_task] = included_tasks - - display.debug("done extending task lists") - display.debug("done processing included files") - display.debug("results queue empty") + self._process_includes(results, iterator) + display.debug("checking for any_errors_fatal") failed_hosts = [] unreachable_hosts = [] @@ -353,6 +254,4 @@ class StrategyModule(StrategyBase): return self._tqm.RUN_UNKNOWN_ERROR # run the base class run() method, which executes the cleanup function - # and runs any outstanding handlers which have been triggered - return super(StrategyModule, self).run(iterator, play_context, result) diff --git a/lib/ansible/utils/vars.py b/lib/ansible/utils/vars.py index 50ddf42e7f8..b207bb216df 100644 --- a/lib/ansible/utils/vars.py +++ b/lib/ansible/utils/vars.py @@ -264,3 +264,21 @@ def isidentifier(ident): return False return True + + +def get_loop_item_vars(result, task): + item_vars = {} + if task.loop or task.loop_with: + loop_var = result.get('ansible_loop_var', 'item') + index_var = result.get('ansible_index_var') + if loop_var in result: + item_vars[loop_var] = result[loop_var] + item_vars['ansible_loop_var'] = loop_var + if index_var and index_var in result: + item_vars[index_var] = result[index_var] + item_vars['ansible_index_var'] = index_var + if '_ansible_item_label' in result: + item_vars['_ansible_item_label'] = result['_ansible_item_label'] + if 'ansible_loop' in result: + item_vars['ansible_loop'] = result['ansible_loop'] + return item_vars