From 6fcbed0f3f483324dfec76e9eb5b7297dc7a6993 Mon Sep 17 00:00:00 2001 From: Martin Krizek Date: Wed, 7 Jun 2023 18:14:07 +0200 Subject: [PATCH] Simplify conditional.py (#80584) * Simplify conditional.py --- .../CleansingNodeVisitor-removal.yml | 2 + lib/ansible/playbook/conditional.py | 147 ++++-------------- 2 files changed, 36 insertions(+), 113 deletions(-) create mode 100644 changelogs/fragments/CleansingNodeVisitor-removal.yml diff --git a/changelogs/fragments/CleansingNodeVisitor-removal.yml b/changelogs/fragments/CleansingNodeVisitor-removal.yml new file mode 100644 index 00000000000..5214e572e97 --- /dev/null +++ b/changelogs/fragments/CleansingNodeVisitor-removal.yml @@ -0,0 +1,2 @@ +minor_changes: + - Remove the ``CleansingNodeVisitor`` class and its usage due to the templating changes that made it superfluous. Also simplify the ``Conditional`` class. diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py index 163f9129c94..bf2419e4b0f 100644 --- a/lib/ansible/playbook/conditional.py +++ b/lib/ansible/playbook/conditional.py @@ -19,14 +19,9 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type -import ast import typing as t -from jinja2.compiler import generate -from jinja2.exceptions import UndefinedError - from ansible.errors import AnsibleError, AnsibleUndefinedVariable -from ansible.module_utils.six import text_type from ansible.module_utils.common.text.converters import to_native from ansible.playbook.attribute import FieldAttribute from ansible.template import Templar @@ -36,7 +31,6 @@ display = Display() class Conditional: - ''' This is a mix-in class, to be used with Base to allow the object to be run conditionally when a condition is met or skipped. @@ -53,7 +47,7 @@ class Conditional: raise AnsibleError("a loader must be specified when using Conditional() directly") else: self._loader = loader - super(Conditional, self).__init__() + super().__init__() def _validate_when(self, attr, name, value): if not isinstance(value, list): @@ -67,123 +61,50 @@ class Conditional: return self.evaluate_conditional_with_result(templar, all_vars)[0] def evaluate_conditional_with_result(self, templar: Templar, all_vars: dict[str, t.Any]) -> tuple[bool, t.Optional[str]]: - """ - Loops through the conditionals set on this object, returning + """Loops through the conditionals set on this object, returning False if any of them evaluate as such as well as the condition that was false. """ - - # since this is a mix-in, it may not have an underlying datastructure - # associated with it, so we pull it out now in case we need it for - # error reporting below - ds = None - if hasattr(self, '_ds'): - ds = getattr(self, '_ds') - - result = True - false_condition: t.Optional[str] = None - try: - for conditional in self.when: - - # do evaluation - if conditional is None or conditional == '': - res = True - elif isinstance(conditional, bool): - res = conditional - else: + for conditional in self.when: + if conditional is None or conditional == "": + res = True + elif isinstance(conditional, bool): + res = conditional + else: + try: res = self._check_conditional(conditional, templar, all_vars) + except AnsibleError as e: + raise AnsibleError( + "The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), + obj=getattr(self, '_ds', None) + ) - # only update if still true, preserve false - if result: - result = res - - display.debug("Evaluated conditional (%s): %s" % (conditional, res)) - if not result: - false_condition = conditional - break - - except Exception as e: - raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds) + display.debug("Evaluated conditional (%s): %s" % (conditional, res)) + if not res: + return res, conditional - return result, false_condition - - def _check_conditional(self, conditional, templar, all_vars): - ''' - This method does the low-level evaluation of each conditional - set on this object, using jinja2 to wrap the conditionals for - evaluation. - ''' + return True, None + def _check_conditional(self, conditional: str, templar: Templar, all_vars: dict[str, t.Any]) -> bool: original = conditional - - if templar.is_template(conditional): - display.warning('conditional statements should not include jinja2 ' - 'templating delimiters such as {{ }} or {%% %%}. ' - 'Found: %s' % conditional) - - # make sure the templar is using the variables specified with this method templar.available_variables = all_vars - try: - # if the conditional is "unsafe", disable lookups - disable_lookups = hasattr(conditional, '__UNSAFE__') - conditional = templar.template(conditional, disable_lookups=disable_lookups) + if templar.is_template(conditional): + display.warning( + "conditional statements should not include jinja2 " + "templating delimiters such as {{ }} or {%% %%}. " + "Found: %s" % conditional + ) + conditional = templar.template(conditional) + if isinstance(conditional, bool): + return conditional + elif conditional == "": + return False - if not isinstance(conditional, text_type) or conditional == "": - return conditional - - # update the lookups flag, as the string returned above may now be unsafe - # and we don't want future templating calls to do unsafe things - disable_lookups |= hasattr(conditional, '__UNSAFE__') - - # First, we do some low-level jinja2 parsing involving the AST format of the - # statement to ensure we don't do anything unsafe (using the disable_lookup flag above) - class CleansingNodeVisitor(ast.NodeVisitor): - def generic_visit(self, node, inside_call=False, inside_yield=False): - if isinstance(node, ast.Call): - inside_call = True - elif isinstance(node, ast.Yield): - inside_yield = True - elif isinstance(node, ast.Constant) and isinstance(node.value, text_type): - if disable_lookups: - if inside_call and node.value.startswith("__"): - # calling things with a dunder is generally bad at this point... - raise AnsibleError( - "Invalid access found in the conditional: '%s'" % conditional - ) - elif inside_yield: - # we're inside a yield, so recursively parse and traverse the AST - # of the result to catch forbidden syntax from executing - parsed = ast.parse(node.value, mode='exec') - cnv = CleansingNodeVisitor() - cnv.visit(parsed) - # iterate over all child nodes - for child_node in ast.iter_child_nodes(node): - self.generic_visit( - child_node, - inside_call=inside_call, - inside_yield=inside_yield - ) - try: - res = templar.environment.parse(conditional, None, None) - res = generate(res, templar.environment, None, None) - parsed = ast.parse(res, mode='exec') - - cnv = CleansingNodeVisitor() - cnv.visit(parsed) - except Exception as e: - raise AnsibleError("Invalid conditional detected: %s" % to_native(e)) - - # and finally we generate and template the presented string and look at the resulting string # NOTE The spaces around True and False are intentional to short-circuit literal_eval for # jinja2_native=False and avoid its expensive calls. - presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional - val = templar.template(presented, disable_lookups=disable_lookups).strip() - if val == "True": - return True - elif val == "False": - return False - else: - raise AnsibleError("unable to evaluate conditional: %s" % original) - except (AnsibleUndefinedVariable, UndefinedError) as e: + return templar.template( + "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional + ).strip() == "True" + except AnsibleUndefinedVariable as e: raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e))