Move processing includes into StrategyBase

ci_complete
pull/84040/head
Martin Krizek 2 months ago
parent c6e166319a
commit f24a6a93cb

@ -28,6 +28,7 @@ from ansible.playbook.task_include import TaskInclude
from ansible.playbook.role_include import IncludeRole
from ansible.template import Templar
from ansible.utils.display import Display
from ansible.utils.vars import get_loop_item_vars
display = Display()
@ -65,22 +66,19 @@ class IncludedFile:
task_vars_cache = {}
for res in results:
original_host = res._host
original_task = res._task
if original_task.action in C._ACTION_ALL_INCLUDES:
if original_task.action not in C._ACTION_ALL_INCLUDES:
continue
if original_task.loop:
if 'results' not in res._result:
continue
include_results = res._result['results']
include_results = res._result.get('results', [])
else:
include_results = [res._result]
for include_result in include_results:
# if the task result was skipped or failed, continue
if 'skipped' in include_result and include_result['skipped'] or 'failed' in include_result and include_result['failed']:
if include_result.get('skipped') or include_result.get('failed'):
continue
cache_key = (iterator._play, original_host, original_task)
@ -89,20 +87,10 @@ class IncludedFile:
except KeyError:
task_vars = task_vars_cache[cache_key] = variable_manager.get_vars(play=iterator._play, host=original_host, task=original_task)
special_vars = get_loop_item_vars(include_result, original_task)
task_vars.update(special_vars)
include_args = include_result.pop('include_args', dict())
special_vars = {}
loop_var = include_result.get('ansible_loop_var', 'item')
index_var = include_result.get('ansible_index_var')
if loop_var in include_result:
task_vars[loop_var] = special_vars[loop_var] = include_result[loop_var]
task_vars['ansible_loop_var'] = special_vars['ansible_loop_var'] = loop_var
if index_var and index_var in include_result:
task_vars[index_var] = special_vars[index_var] = include_result[index_var]
task_vars['ansible_index_var'] = special_vars['ansible_index_var'] = index_var
if '_ansible_item_label' in include_result:
task_vars['_ansible_item_label'] = special_vars['_ansible_item_label'] = include_result['_ansible_item_label']
if 'ansible_loop' in include_result:
task_vars['ansible_loop'] = special_vars['ansible_loop'] = include_result['ansible_loop']
if original_task.no_log and '_ansible_no_log' not in include_args:
task_vars['_ansible_no_log'] = special_vars['_ansible_no_log'] = original_task.no_log
@ -195,8 +183,7 @@ class IncludedFile:
from_key = from_arg.removesuffix('_from')
new_task._from_files[from_key] = templar.template(include_args.pop(from_arg))
omit_token = task_vars.get('omit')
if omit_token:
if omit_token := task_vars.get('omit'):
new_task._from_files = remove_omit(new_task._from_files, omit_token)
inc_file = IncludedFile(role_name, include_args, special_vars, new_task, is_role=True)

@ -18,6 +18,7 @@
from __future__ import annotations
import cmd
import collections
import functools
import os
import pprint
@ -36,7 +37,7 @@ from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleError, AnsibleFileNotFound, AnsibleUndefinedVariable, AnsibleParserError
from ansible.executor import action_write_locks
from ansible.executor.play_iterator import IteratingStates, PlayIterator
from ansible.executor.play_iterator import IteratingStates
from ansible.executor.process.worker import WorkerProcess
from ansible.executor.task_result import TaskResult
from ansible.executor.task_queue_manager import CallbackSend, DisplaySend, PromptSend
@ -47,6 +48,7 @@ from ansible.module_utils.connection import Connection, ConnectionError
from ansible.playbook.conditional import Conditional
from ansible.playbook.handler import Handler
from ansible.playbook.helpers import load_list_of_blocks
from ansible.playbook.included_file import IncludedFile
from ansible.playbook.task import Task
from ansible.playbook.task_include import TaskInclude
from ansible.plugins import loader as plugin_loader
@ -54,9 +56,12 @@ from ansible.template import Templar
from ansible.utils.display import Display
from ansible.utils.fqcn import add_internal_fqcns
from ansible.utils.unsafe_proxy import wrap_var
from ansible.utils.vars import combine_vars
from ansible.utils.vars import combine_vars, get_loop_item_vars
from ansible.vars.clean import strip_internal_keys, module_response_deepcopy
if t.TYPE_CHECKING:
from ansible.executor.play_iterator import PlayIterator
display = Display()
__all__ = ['StrategyBase']
@ -92,22 +97,6 @@ def post_process_whens(result, task, templar, task_vars):
result['failed_when_result'] = result['failed'] = failed_when_result
def _get_item_vars(result, task):
item_vars = {}
if task.loop or task.loop_with:
loop_var = result.get('ansible_loop_var', 'item')
index_var = result.get('ansible_index_var')
if loop_var in result:
item_vars[loop_var] = result[loop_var]
if index_var and index_var in result:
item_vars[index_var] = result[index_var]
if '_ansible_item_label' in result:
item_vars['_ansible_item_label'] = result['_ansible_item_label']
if 'ansible_loop' in result:
item_vars['ansible_loop'] = result['ansible_loop']
return item_vars
def results_thread_main(strategy):
while True:
try:
@ -685,7 +674,7 @@ class StrategyBase:
self._inventory.add_dynamic_group(original_host, result_item)
if 'add_host' in result_item or 'add_group' in result_item:
item_vars = _get_item_vars(result_item, original_task)
item_vars = get_loop_item_vars(result_item, original_task)
found_task_vars = self._queued_task_cache.get((original_host.name, task_result._task._uuid))['task_vars']
if item_vars:
all_task_vars = combine_vars(found_task_vars, item_vars)
@ -796,6 +785,103 @@ class StrategyBase:
return ret_results
def _process_includes(self, results: list[TaskResult], iterator: PlayIterator) -> None:
if not (
included_files := IncludedFile.process_include_results(
results=results,
iterator=iterator,
loader=self._loader,
variable_manager=self._variable_manager,
)
):
return
display.debug("we have included files to process")
hosts_left = self.get_hosts_left(iterator)
all_blocks = collections.defaultdict(list)
included_tasks = []
failed_includes_hosts = set()
for included_file in included_files:
display.debug("processing included file: %s" % included_file._filename)
is_handler = False
try:
if included_file._is_role:
new_ir = self._copy_included_file(included_file)
new_blocks, unused_var = new_ir.get_block_list(
play=iterator._play,
variable_manager=self._variable_manager,
loader=self._loader,
)
else:
is_handler = isinstance(included_file._task, Handler)
new_blocks = self._load_included_file(
included_file,
iterator=iterator,
is_handler=is_handler,
handle_stats_and_callbacks=False,
)
# let PlayIterator know about any new handlers included via include_role or
# import_role within include_role/include_taks
iterator.handlers = [h for b in iterator._play.handlers for h in b.block]
display.debug("iterating over new_blocks loaded from include file")
for new_block in new_blocks:
if is_handler:
for task in new_block.block:
task.notified_hosts = included_file._hosts[:]
final_block = new_block
else:
task_vars = self._variable_manager.get_vars(
play=iterator._play,
task=new_block.get_first_parent_include(),
_hosts=self._hosts_cache,
_hosts_all=self._hosts_cache_all,
)
display.debug("filtering new block on tags")
final_block = new_block.filter_tagged_tasks(task_vars)
display.debug("done filtering new block on tags")
included_tasks.extend(final_block.get_tasks())
for host in hosts_left:
if host in included_file._hosts:
all_blocks[host].append(final_block)
display.debug("done iterating over new_blocks loaded from include file")
except AnsibleParserError:
raise
except AnsibleError as e:
display.error(to_text(e), wrap_text=False)
for r in included_file._results:
r._result['failed'] = True
r._result['reason'] = str(e)
self._tqm._stats.increment('failures', r._host.name)
self._tqm.send_callback('v2_runner_on_failed', r)
failed_includes_hosts.add(r._host)
else:
# since we skip incrementing the stats when the task result is
# first processed, we do so now for each host in the list
for host in included_file._hosts:
self._tqm._stats.increment('ok', host.name)
self._tqm.send_callback('v2_playbook_on_include', included_file)
for host in failed_includes_hosts:
self._tqm._failed_hosts[host.name] = True
iterator.mark_host_failed(host)
# finally go through all of the hosts and append the
# accumulated blocks to their list of tasks
display.debug("extending task lists for all hosts with included blocks")
for host in hosts_left:
iterator.add_tasks(host, all_blocks[host])
iterator.all_tasks[iterator.cur_task:iterator.cur_task] = included_tasks
display.debug("done extending task lists")
display.debug("done processing included files")
def _wait_on_pending_results(self, iterator):
"""
Wait for the shared counter to drop to zero, using a short sleep

@ -32,9 +32,8 @@ DOCUMENTATION = """
import time
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError
from ansible.errors import AnsibleError
from ansible.playbook.handler import Handler
from ansible.playbook.included_file import IncludedFile
from ansible.plugins.loader import action_loader
from ansible.plugins.strategy import StrategyBase
from ansible.template import Templar
@ -102,7 +101,7 @@ class StrategyModule(StrategyBase):
host_name = host.get_name()
# peek at the next task for the host, to see if there's
# anything to do do for this host
# anything to do for this host
(state, task) = iterator.get_next_task_for_host(host, peek=True)
display.debug("free host state: %s" % state, host=host_name)
display.debug("free host task: %s" % task, host=host_name)
@ -179,7 +178,7 @@ class StrategyModule(StrategyBase):
self._execute_meta(task, play_context, iterator, target_host=host)
self._blocked_hosts[host_name] = False
else:
# handle step if needed, skip meta actions as they are used internally
# handle step if needed
if not self._step or self._take_step(task, host_name):
if task.any_errors_fatal:
display.warning("Using any_errors_fatal with the free strategy is not supported, "
@ -219,91 +218,13 @@ class StrategyModule(StrategyBase):
self.update_active_connections(results)
included_files = IncludedFile.process_include_results(
host_results,
iterator=iterator,
loader=self._loader,
variable_manager=self._variable_manager
)
if len(included_files) > 0:
all_blocks = dict((host, []) for host in hosts_left)
failed_includes_hosts = set()
for included_file in included_files:
display.debug("collecting new blocks for %s" % included_file)
is_handler = False
try:
if included_file._is_role:
new_ir = self._copy_included_file(included_file)
new_blocks, handler_blocks = new_ir.get_block_list(
play=iterator._play,
variable_manager=self._variable_manager,
loader=self._loader,
)
else:
is_handler = isinstance(included_file._task, Handler)
new_blocks = self._load_included_file(
included_file,
iterator=iterator,
is_handler=is_handler,
handle_stats_and_callbacks=False,
)
# let PlayIterator know about any new handlers included via include_role or
# import_role within include_role/include_taks
iterator.handlers = [h for b in iterator._play.handlers for h in b.block]
except AnsibleParserError:
raise
except AnsibleError as e:
display.error(to_text(e), wrap_text=False)
for r in included_file._results:
r._result['failed'] = True
r._result['reason'] = str(e)
self._tqm._stats.increment('failures', r._host.name)
self._tqm.send_callback('v2_runner_on_failed', r)
failed_includes_hosts.add(r._host)
continue
else:
# since we skip incrementing the stats when the task result is
# first processed, we do so now for each host in the list
for host in included_file._hosts:
self._tqm._stats.increment('ok', host.name)
self._tqm.send_callback('v2_playbook_on_include', included_file)
for new_block in new_blocks:
if is_handler:
for task in new_block.block:
task.notified_hosts = included_file._hosts[:]
final_block = new_block
else:
task_vars = self._variable_manager.get_vars(
play=iterator._play,
task=new_block.get_first_parent_include(),
_hosts=self._hosts_cache,
_hosts_all=self._hosts_cache_all,
)
final_block = new_block.filter_tagged_tasks(task_vars)
for host in hosts_left:
if host in included_file._hosts:
all_blocks[host].append(final_block)
display.debug("done collecting new blocks for %s" % included_file)
for host in failed_includes_hosts:
self._tqm._failed_hosts[host.name] = True
iterator.mark_host_failed(host)
display.debug("adding all collected blocks from %d included file(s) to iterator" % len(included_files))
for host in hosts_left:
iterator.add_tasks(host, all_blocks[host])
display.debug("done adding collected blocks to iterator")
self._process_includes(results, iterator)
# pause briefly so we don't spin lock
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
# collect all the final results
results = self._wait_on_pending_results(iterator)
self._wait_on_pending_results(iterator)
# run the base class run() method, which executes the cleanup function
# and runs any outstanding handlers which have been triggered
return super(StrategyModule, self).run(iterator, play_context, result)

@ -30,10 +30,9 @@ DOCUMENTATION = """
"""
from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleAssertionError, AnsibleParserError
from ansible.errors import AnsibleAssertionError
from ansible.module_utils.common.text.converters import to_text
from ansible.playbook.handler import Handler
from ansible.playbook.included_file import IncludedFile
from ansible.plugins.loader import action_loader
from ansible.plugins.strategy import StrategyBase
from ansible.template import Templar
@ -137,10 +136,6 @@ class StrategyModule(StrategyBase):
templar = Templar(loader=self._loader, variables=task_vars)
display.debug("done getting variables")
# test to see if the task across all hosts points to an action plugin which
# sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we
# will only send this task to the first host in the list.
task_action = templar.template(task.action)
try:
@ -159,7 +154,7 @@ class StrategyModule(StrategyBase):
if (task.any_errors_fatal or run_once) and not task.ignore_errors:
any_errors_fatal = True
else:
# handle step if needed, skip meta actions as they are used internally
# handle step if needed
if self._step and choose_step:
if self._take_step(task):
choose_step = False
@ -167,11 +162,14 @@ class StrategyModule(StrategyBase):
skip_rest = True
break
# test to see if the task across all hosts points to an action plugin which
# sets BYPASS_HOST_LOOP to true, or if it has run_once enabled. If so, we
# will only send this task to the first host in the list.
run_once = action and getattr(action, 'BYPASS_HOST_LOOP', False) or templar.template(task.run_once)
try:
task.name = to_text(templar.template(task.name, fail_on_undefined=False), nonstring='empty')
except Exception as e:
display.debug(f"Failed to templalte task name ({task.name}), ignoring error and continuing: {e}")
display.debug(f"Failed to template task name ({task.name}), ignoring error and continuing: {e}")
if (task.any_errors_fatal or run_once) and not task.ignore_errors:
any_errors_fatal = True
@ -206,108 +204,11 @@ class StrategyModule(StrategyBase):
display.debug("done queuing things up, now waiting for results queue to drain")
if self._pending_results > 0:
results.extend(self._wait_on_pending_results(iterator))
self.update_active_connections(results)
included_files = IncludedFile.process_include_results(
results,
iterator=iterator,
loader=self._loader,
variable_manager=self._variable_manager
)
if len(included_files) > 0:
display.debug("we have included files to process")
display.debug("generating all_blocks data")
all_blocks = dict((host, []) for host in hosts_left)
display.debug("done generating all_blocks data")
included_tasks = []
failed_includes_hosts = set()
for included_file in included_files:
display.debug("processing included file: %s" % included_file._filename)
is_handler = False
try:
if included_file._is_role:
new_ir = self._copy_included_file(included_file)
new_blocks, handler_blocks = new_ir.get_block_list(
play=iterator._play,
variable_manager=self._variable_manager,
loader=self._loader,
)
else:
is_handler = isinstance(included_file._task, Handler)
new_blocks = self._load_included_file(
included_file,
iterator=iterator,
is_handler=is_handler,
handle_stats_and_callbacks=False,
)
# let PlayIterator know about any new handlers included via include_role or
# import_role within include_role/include_taks
iterator.handlers = [h for b in iterator._play.handlers for h in b.block]
display.debug("iterating over new_blocks loaded from include file")
for new_block in new_blocks:
if is_handler:
for task in new_block.block:
task.notified_hosts = included_file._hosts[:]
final_block = new_block
else:
task_vars = self._variable_manager.get_vars(
play=iterator._play,
task=new_block.get_first_parent_include(),
_hosts=self._hosts_cache,
_hosts_all=self._hosts_cache_all,
)
display.debug("filtering new block on tags")
final_block = new_block.filter_tagged_tasks(task_vars)
display.debug("done filtering new block on tags")
included_tasks.extend(final_block.get_tasks())
for host in hosts_left:
if host in included_file._hosts:
all_blocks[host].append(final_block)
display.debug("done iterating over new_blocks loaded from include file")
except AnsibleParserError:
raise
except AnsibleError as e:
display.error(to_text(e), wrap_text=False)
for r in included_file._results:
r._result['failed'] = True
r._result['reason'] = str(e)
self._tqm._stats.increment('failures', r._host.name)
self._tqm.send_callback('v2_runner_on_failed', r)
failed_includes_hosts.add(r._host)
else:
# since we skip incrementing the stats when the task result is
# first processed, we do so now for each host in the list
for host in included_file._hosts:
self._tqm._stats.increment('ok', host.name)
self._tqm.send_callback('v2_playbook_on_include', included_file)
for host in failed_includes_hosts:
self._tqm._failed_hosts[host.name] = True
iterator.mark_host_failed(host)
# finally go through all of the hosts and append the
# accumulated blocks to their list of tasks
display.debug("extending task lists for all hosts with included blocks")
for host in hosts_left:
iterator.add_tasks(host, all_blocks[host])
iterator.all_tasks[iterator.cur_task:iterator.cur_task] = included_tasks
display.debug("done extending task lists")
display.debug("done processing included files")
display.debug("results queue empty")
self._process_includes(results, iterator)
display.debug("checking for any_errors_fatal")
failed_hosts = []
unreachable_hosts = []
@ -353,6 +254,4 @@ class StrategyModule(StrategyBase):
return self._tqm.RUN_UNKNOWN_ERROR
# run the base class run() method, which executes the cleanup function
# and runs any outstanding handlers which have been triggered
return super(StrategyModule, self).run(iterator, play_context, result)

@ -264,3 +264,21 @@ def isidentifier(ident):
return False
return True
def get_loop_item_vars(result, task):
item_vars = {}
if task.loop or task.loop_with:
loop_var = result.get('ansible_loop_var', 'item')
index_var = result.get('ansible_index_var')
if loop_var in result:
item_vars[loop_var] = result[loop_var]
item_vars['ansible_loop_var'] = loop_var
if index_var and index_var in result:
item_vars[index_var] = result[index_var]
item_vars['ansible_index_var'] = index_var
if '_ansible_item_label' in result:
item_vars['_ansible_item_label'] = result['_ansible_item_label']
if 'ansible_loop' in result:
item_vars['ansible_loop'] = result['ansible_loop']
return item_vars

Loading…
Cancel
Save