You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/units/_internal/templating/test_templar.py

1083 lines
46 KiB
Python

# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import itertools
import pathlib
import sys
import unittest.mock
import typing as t
import pytest_mock
from jinja2.runtime import Context
import unittest
from ansible._internal._templating._datatag import _JinjaConstTemplate
from ansible.errors import (
AnsibleError, AnsibleUndefinedVariable, AnsibleTemplateSyntaxError, AnsibleBrokenConditionalError, AnsibleTemplateError, AnsibleTemplateTransformLimitError,
TemplateTrustCheckFailedError,
)
from ansible._internal._templating._errors import AnsibleTemplatePluginRuntimeError, AnsibleTemplatePluginLoadError, AnsibleTemplatePluginNotFoundError
from ansible._internal._errors._handler import ErrorAction, ErrorHandler
from ansible.module_utils._internal._datatag import AnsibleTagHelper, AnsibleDatatagBase
from ansible.module_utils._internal._datatag._tags import Deprecated
from ansible._internal._templating import _transform
from ansible.utils.collection_loader._collection_finder import _AnsibleCollectionFinder
from ansible._internal._datatag._tags import Origin, TrustedAsTemplate
from ansible.plugins.loader import init_plugin_loader
from ansible._internal._templating._jinja_common import _TemplateConfig, _SandboxMode
from ansible._internal._templating._jinja_plugins import _lookup
from ansible._internal._templating import _jinja_plugins
from ansible._internal._templating._engine import TemplateEngine, TemplateOptions
from ansible._internal._templating._jinja_bits import AnsibleEnvironment, AnsibleContext, is_possibly_template, is_possibly_all_template
from ansible._internal._templating._marker_behaviors import ReplacingMarkerBehavior
from ansible._internal._templating._utils import TemplateContext
from ansible.module_utils._internal import _event_utils
from ansible.utils.display import Display, _DeferredWarningContext
from units.mock.loader import DictDataLoader
from units.test_utils.controller.display import emits_warnings
import pytest
TRUST = TrustedAsTemplate()
origin = Origin(path="/some/path/for/testing", line_num=1, col_num=2)
class BaseTemplar(object):
def setUp(self):
init_plugin_loader()
self.test_vars = dict(
foo="bar",
bam=TrustedAsTemplate().tag("{{foo}}"),
num=1,
var_true=True,
var_false=False,
var_dict=dict(a="b"),
bad_dict="{a='b'",
var_list=[1],
recursive=TrustedAsTemplate().tag("{{recursive}}"),
some_var="blip",
some_keyword=TrustedAsTemplate().tag("{{ foo }}"),
some_unsafe_var="unsafe_blip",
some_unsafe_keyword=TrustedAsTemplate().tag("{{ foo }}"),
str_with_error=TrustedAsTemplate().tag("{{ 'str' | from_json }}"),
template_dict={TrustedAsTemplate().tag("{{ a_keyword }}"): TrustedAsTemplate().tag("{{ some_var }}")},
template_var=TrustedAsTemplate().tag('{{ some_var }}'),
)
self.fake_loader = DictDataLoader({
"/path/to/my_file.txt": "foo\n",
})
self.templar = TemplateEngine(loader=self.fake_loader, variables=self.test_vars)
self._ansible_context = AnsibleContext(self.templar.environment, {}, {}, {})
def tearDown(self):
_AnsibleCollectionFinder._remove()
nuke_module_prefix('ansible_collections')
class TestTemplarTemplate(BaseTemplar, unittest.TestCase):
def test_trust_fail_raises_in_tests(self):
"""Ensure template trust check failures default to fatal for unit tests (set in units/conftest.py)"""
from ansible._internal._templating._engine import TemplateTrustCheckFailedError
assert _TemplateConfig.untrusted_template_handler.action is ErrorAction.ERROR
with pytest.raises(TemplateTrustCheckFailedError):
self.templar.template("{{ i_am_not_trusted }}")
def test_trust_fail_warning_behavior(self):
"""Validate that trust checks are non-fatal when TemplateConfig.untrusted_template_handler is set to `ErrorAction.WARNING`."""
untrusted_template = "{{ i_am_not_trusted }}"
assert hasattr(_TemplateConfig, 'untrusted_template_handler')
with (unittest.mock.patch.object(_TemplateConfig, 'untrusted_template_handler', ErrorHandler(ErrorAction.WARNING)),
unittest.mock.patch.object(Display, 'error_as_warning', return_value=None) as mock_warning):
assert self.templar.template(untrusted_template) is untrusted_template
assert mock_warning.call_count > 0
warning_value = mock_warning.call_args.kwargs['exception']
assert isinstance(warning_value, TemplateTrustCheckFailedError)
assert "Encountered untrusted template or expression" in warning_value.message
assert warning_value.obj == untrusted_template
def test_is_possible_template(self):
"""This test ensures that a broken template still gets templated"""
# Purposefully invalid jinja
self.assertRaises(AnsibleError, self.templar.template, TrustedAsTemplate().tag('{{ foo|default(False)) }}'))
def test_is_template_raw_string(self):
res = self.templar.is_template('foo')
self.assertFalse(res)
def test_is_template_none(self):
res = self.templar.is_template(None)
self.assertFalse(res)
def test_template(self):
res = self.templar.template(TrustedAsTemplate().tag('{{foo}}'))
self.assertTrue(res)
self.assertEqual(res, 'bar')
def test_template_in_data(self):
res = self.templar.template(TrustedAsTemplate().tag('{{bam}}'))
self.assertTrue(res)
self.assertEqual(res, 'bar')
def test_template_bare(self):
res = self.templar.template('bam')
self.assertTrue(res)
self.assertEqual(res, 'bam')
def test_template_to_json(self):
res = self.templar.template(TrustedAsTemplate().tag('{{bam|to_json}}'))
self.assertTrue(res)
self.assertEqual(res, '"bar"')
def test_template_untagged_string(self):
unsafe_obj = "Hello"
res = self.templar.template(unsafe_obj)
assert not TrustedAsTemplate.is_tagged_on(res)
def test_weird(self):
data = TrustedAsTemplate().tag(u"""1 2 #}huh{# %}ddfg{% }}dfdfg{{ {%what%} {{#foo#}} {%{bar}%} {#%blip%#} {{asdfsd%} 3 4 {{foo}} 5 6 7""")
self.assertRaisesRegex(AnsibleError,
'Syntax error in template',
self.templar.template,
data)
def test_template_with_error(self):
"""Check that AnsibleError is raised, fail if an unhandled exception is raised"""
self.assertRaises(AnsibleError, self.templar.template, TrustedAsTemplate().tag("{{ str_with_error }}"))
class TestTemplarMisc(BaseTemplar, unittest.TestCase):
def test_templar_simple(self):
templar = self.templar
# test some basic templating
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{foo}}")), "bar")
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{foo}}\n")), "bar\n")
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{foo}}\n"), options=TemplateOptions(preserve_trailing_newlines=True)), "bar\n")
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{foo}}\n"), options=TemplateOptions(preserve_trailing_newlines=False)), "bar")
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{bam}}")), "bar")
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{num}}")), 1)
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{var_true}}")), True)
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{var_false}}")), False)
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{var_dict}}")), dict(a="b"))
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{bad_dict}}")), "{a='b'")
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{var_list}}")), [1])
# force errors
self.assertRaises(AnsibleUndefinedVariable, templar.template, TrustedAsTemplate().tag("{{bad_var}}"))
self.assertRaises(AnsibleUndefinedVariable, templar.template, TrustedAsTemplate().tag("{{lookup('file', bad_var)}}"))
self.assertRaises(AnsibleError, templar.template, TrustedAsTemplate().tag("{{lookup('bad_lookup')}}"))
self.assertRaises(AnsibleError, templar.template, TrustedAsTemplate().tag("{{recursive}}"))
self.assertRaises(AnsibleUndefinedVariable, templar.template, TrustedAsTemplate().tag("{{foo-bar}}"))
result = templar.extend(marker_behavior=ReplacingMarkerBehavior()).template(TrustedAsTemplate().tag("{{bad_var}}"))
assert "<< error 1 - 'bad_var' is undefined >>" in result
# test setting available_variables
templar.available_variables = dict(foo="bam")
self.assertEqual(templar.template(TrustedAsTemplate().tag("{{foo}}")), "bam")
def test_templar_escape_backslashes(self):
# Rule of thumb: If escape backslashes is True you should end up with
# the same number of backslashes as when you started.
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\t{{foo}}"), options=TemplateOptions(escape_backslashes=True)), "\tbar")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\t{{foo}}"), options=TemplateOptions(escape_backslashes=False)), "\tbar")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo}}"), options=TemplateOptions(escape_backslashes=True)), "\\bar")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo}}"), options=TemplateOptions(escape_backslashes=False)), "\\bar")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo + '\t' }}"), options=TemplateOptions(escape_backslashes=True)), "\\bar\t")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo + '\t' }}"), options=TemplateOptions(escape_backslashes=False)), "\\bar\t")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo + '\\t' }}"), options=TemplateOptions(escape_backslashes=True)), "\\bar\\t")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo + '\\t' }}"), options=TemplateOptions(escape_backslashes=False)), "\\bar\t")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo + '\\\\t' }}"), options=TemplateOptions(escape_backslashes=True)), "\\bar\\\\t")
self.assertEqual(self.templar.template(TrustedAsTemplate().tag("\\{{foo + '\\\\t' }}"), options=TemplateOptions(escape_backslashes=False)), "\\bar\\t")
class TestTemplarLookup(BaseTemplar, unittest.TestCase):
@staticmethod
def lookup(name: str, /, *args, **kwargs) -> t.Any:
with TemplateContext(template_value=None, templar=TemplateEngine(), options=TemplateOptions(), stop_on_template=False):
return _lookup(name, *args, **kwargs)
def test_lookup_missing_plugin(self):
self.assertRaisesRegex(AnsibleTemplatePluginNotFoundError,
"The lookup plugin 'not_a_real_lookup_plugin' was not found.",
self.lookup,
'not_a_real_lookup_plugin',
'an_arg', a_keyword_arg='a_keyword_arg_value')
def test_lookup_list(self):
res = self.lookup('list', 'an_arg', 'another_arg')
self.assertEqual(res, 'an_arg,another_arg')
def test_lookup_jinja_undefined(self):
self.assertRaisesRegex(AnsibleUndefinedVariable,
"'an_undefined_jinja_var' is undefined",
self.templar.template,
TrustedAsTemplate().tag('{{ lookup("list", an_undefined_jinja_var) }}'))
def test_lookup_jinja_defined(self):
res = self.lookup('list', 'x')
assert not TrustedAsTemplate.is_tagged_on(res)
def test_lookup_jinja_dict_string_passed(self):
self.assertRaisesRegex(AnsibleError,
"lookup plugin expects a dictionary",
self.lookup,
'dict',
'x')
def test_lookup_jinja_dict_list_passed(self):
self.assertRaisesRegex(AnsibleError,
"lookup plugin expects a dictionary",
self.lookup,
'dict',
['foo', 'bar'])
def test_lookup_jinja_kwargs(self):
res = self.lookup('list', 'blip', random_keyword='12345')
assert not TrustedAsTemplate.is_tagged_on(res)
def test_lookup_jinja_list_wantlist(self):
res = self.templar.template(TrustedAsTemplate().tag("{{ lookup('list', template_var, wantlist=True) }}"))
self.assertEqual(res, ["blip"])
def test_lookup_jinja_list_wantlist_undefined(self):
self.assertRaisesRegex(AnsibleUndefinedVariable,
"'some_undefined_var' is undefined",
self.templar.template,
TrustedAsTemplate().tag('{{ lookup("list", some_undefined_var, wantlist=True) }}'))
def test_lookup_jinja_list_wantlist_unsafe(self):
res = self.lookup('list', 'x', wantlist=True)
for lookup_result in res:
assert not TrustedAsTemplate.is_tagged_on(lookup_result)
assert not TrustedAsTemplate.is_tagged_on(res)
def test_lookup_jinja_dict(self):
res = self.templar.template(TrustedAsTemplate().tag('{{ lookup("list", template_dict) }}'))
self.assertEqual(res['{{ a_keyword }}'], "blip")
assert not TrustedAsTemplate.is_tagged_on(res)
def test_lookup_jinja_dict_unsafe(self):
res = self.lookup('list', {'x': 'x'})
assert not TrustedAsTemplate.is_tagged_on(res['x'])
assert not TrustedAsTemplate.is_tagged_on(res)
def test_lookup_jinja_dict_unsafe_value(self):
res = self.lookup('list', {'x': 'x'})
assert not TrustedAsTemplate.is_tagged_on(res['x'])
assert not TrustedAsTemplate.is_tagged_on(res)
def test_lookup_jinja_none(self):
res = self.lookup('list', None)
self.assertIsNone(res)
class TestAnsibleContext(BaseTemplar, unittest.TestCase):
def _context(self, variables=None):
variables = variables or {}
env = AnsibleEnvironment()
context = AnsibleContext(env, parent={}, name='some_context',
blocks={})
for key, value in variables.items():
context.vars[key] = value
return context
def test(self):
context = self._context()
self.assertIsInstance(context, AnsibleContext)
self.assertIsInstance(context, Context)
def test_resolve_unsafe(self):
context = self._context(variables={'some_unsafe_key': 'some_unsafe_string'})
res = context.resolve('some_unsafe_key')
assert not TrustedAsTemplate.is_tagged_on(res)
def test_resolve_unsafe_list(self):
context = self._context(variables={'some_unsafe_key': ['some unsafe string 1']})
res = context.resolve('some_unsafe_key')
assert not TrustedAsTemplate.is_tagged_on(res[0])
assert not TrustedAsTemplate.is_tagged_on(res)
def test_resolve_unsafe_dict(self):
context = self._context(variables={'some_unsafe_key':
{'an_unsafe_dict': 'some unsafe string 1'}
})
res = context.resolve('some_unsafe_key')
assert not TrustedAsTemplate.is_tagged_on(res['an_unsafe_dict'])
def test_resolve(self):
context = self._context(variables={'some_key': 'some_string'})
res = context.resolve('some_key')
self.assertEqual(res, 'some_string')
def test_resolve_none(self):
context = self._context(variables={'some_key': None})
res = context.resolve('some_key')
self.assertEqual(res, None)
def test_unsafe_lookup():
res = TemplateEngine(
None,
variables={
'var0': TrustedAsTemplate().tag('{{ var1 }}'),
'var1': ['unsafe'],
}
).template(TrustedAsTemplate().tag('{{ lookup("list", var0) }}'))
assert not TrustedAsTemplate.is_tagged_on(res[0])
def test_unsafe_lookup_no_conversion():
res = TemplateEngine(
None,
variables={
'var0': TrustedAsTemplate().tag('{{ var1 }}'),
'var1': ['unsafe'],
}
).template(
TrustedAsTemplate().tag('{{ lookup("list", var0) }}'),
)
assert not TrustedAsTemplate.is_tagged_on(res)
@pytest.mark.parametrize("tagged", (
False,
True,
))
def test_dict_template(tagged: bool) -> None:
"""Verify that templar.template can round-trip both tagged and untagged values in a dict."""
key1 = "key1"
val1 = "val1"
if tagged:
key1 = origin.tag(key1)
val1 = origin.tag(val1)
test1 = {
key1: val1,
}
variables = dict(
test1=test1,
)
templar = TemplateEngine(variables=variables)
result = templar.template(TrustedAsTemplate().tag('{{test1}}'))
assert result == test1
assert AnsibleTagHelper.tags(result) == AnsibleTagHelper.tags(test1)
@pytest.mark.parametrize("expr,expected,variables", [
("'constant'", "constant", None),
("a - b", 42, dict(a=100, b=58)),
])
def test_evaluate_expression(expr: str, expected: t.Any, variables: dict[str, t.Any] | None):
assert TemplateEngine(variables=variables).evaluate_expression(TRUST.tag(expr)) == expected
@pytest.mark.parametrize("expr,error_type", [
("fhdgsfk#$76&@#$&", AnsibleTemplateSyntaxError),
("bogusvar", AnsibleUndefinedVariable),
("untrusted expression", TemplateTrustCheckFailedError),
(dict(hi="{{'mom'}}"), TypeError),
])
def test_evaluate_expression_errors(expr: str, error_type: type[Exception]):
if error_type is not TemplateTrustCheckFailedError:
expr = TRUST.tag(expr)
with pytest.raises(error_type):
TemplateEngine().evaluate_expression(expr)
@pytest.mark.parametrize("conditional,expected,variables", [
("1 == 2", False, None),
("test2_name | default(True)", True, None),
# DTFIX5: more success cases?
])
def test_evaluate_conditional(conditional: str, expected: t.Any, variables: dict[str, t.Any] | None):
assert TemplateEngine().evaluate_conditional(TRUST.tag(conditional)) == expected
@pytest.mark.parametrize("conditional,error_type", [
("fkjhs$#@^%$*& ldfkjds", AnsibleTemplateSyntaxError),
("#jinja2:variable_start_string:2\n{{blah}}", AnsibleTemplateSyntaxError),
("#jinja2:bogus_key:'val'\n{{blah}}", AnsibleTemplateSyntaxError),
("bogusvar", AnsibleUndefinedVariable),
("not trusted", TemplateTrustCheckFailedError),
])
def test_evaluate_conditional_errors(conditional: t.Any, error_type: type[Exception], mocker: pytest_mock.MockerFixture):
mocker.patch.object(_TemplateConfig, 'allow_embedded_templates', True) # force this on since a number of cases need it
if error_type is not TemplateTrustCheckFailedError:
conditional = TRUST.tag(conditional)
with pytest.raises(error_type):
TemplateEngine().evaluate_conditional(conditional)
@pytest.mark.parametrize("value", (
'{{ foo }}',
'{% foo %}',
'{# foo #}',
'{# {{ foo }} #}',
'{# {{ nothing }} {# #}',
'{# {{ nothing }} {# #} #}',
'{% raw %}{{ foo }}{% endraw %}',
# in 2.16 and earlier these were not considered templates due to syntax errors
# now syntax errors in templates are still reported as templates, since is_template no longer compiles the template
'{{ foo',
'{% foo',
'{# foo',
'{{ foo %}',
'{{ foo #}',
'{% foo }}',
'{% foo #}',
'{# foo %}',
'{# foo }}',
'{{ foo {{',
'{% raw %}{% foo %}',
))
def test_is_template_true(value: str) -> None:
assert TemplateEngine().is_template(TRUST.tag(value))
@pytest.mark.parametrize("value", (
'foo',
))
def test_is_template_false(value: str) -> None:
assert not TemplateEngine().is_template(TRUST.tag(value))
@pytest.mark.parametrize("value", (
'{{ foo }}',
'{% foo %}',
'{# foo #}',
'{# {{ foo }} #}',
'{# {{ nothing }} {# #}',
'{# {{ nothing }} {# #} #}',
'{% raw %}{{ foo }}{% endraw %}',
'{{',
'{%',
'{#',
'{% raw',
))
def test_is_possibly_template_true(value: str) -> None:
assert is_possibly_template(value)
@pytest.mark.parametrize("value", (
'{',
'%',
'#',
'foo',
'}}',
'%}',
'raw %}',
'#}',
))
def test_is_possibly_template_false(value: str) -> None:
assert not is_possibly_template(value)
def test_stop_on_container() -> None:
# DTFIX5: add more test cases
assert TemplateEngine().resolve_to_container(TRUST.tag('{{ [ 1 ] }}')) == [1]
@pytest.mark.parametrize("value", [True, False])
def test_stripped_conditionals(value: bool, mocker: pytest_mock.MockerFixture) -> None:
mocker.patch.object(_TemplateConfig, 'allow_embedded_templates', True) # force this on since this case needs it
assert TemplateEngine().evaluate_conditional(TRUST.tag(f"""\n \r\n \t{{{{ {value} }}}} \n\n \t \t\t """)) == value
@pytest.mark.parametrize("template, variables, error", (
("{{ undefined_var.undefined_attribute }}", {}, "'undefined_var' is undefined >>"),
("{{ some_dict['undefined_key'] }}", dict(some_dict={}), "object of type 'dict' has no attribute 'undefined_key' >>"),
("{{ some_dict.undefined_key }}", dict(some_dict={}), "object of type 'dict' has no attribute 'undefined_key' >>"),
("{{ m1 }} {{ m2 }} here", {}, "<< error 1 - 'm1' is undefined >> << error 2 - 'm2' is undefined >> here"),
("before {{ m1 + m2 }} after", {}, "before << error 1 - 'm1' is undefined >><< error 2 - template potentially truncated >>"),
))
def test_jinja_sourced_undefined(template: str, variables: dict[str, t.Any], error: str) -> None:
"""
Ensure when Jinja encounters a `Marker` and raises `MarkerError`,
that we turn it back into the original `Marker` so marker_behavior can handle it during finalization.
"""
assert error in TemplateEngine(variables=variables, marker_behavior=ReplacingMarkerBehavior()).template(TRUST.tag(template))
def test_omit_concat() -> None:
assert TemplateEngine().template(TRUST.tag("{{ omit }}hi{{ omit }} mom")) == 'hi mom'
@pytest.mark.parametrize("conditional", (
# Jinja plugins
"'join' is filter",
"'join' is not test",
"'eq' is test",
"'eq' is not filter",
# Ansible plugins
"'comment' is filter",
"'comment' is not test",
"'version' is test",
"'version' is not filter",
# plugin not found
"'nope' is not filter",
"'nope' is not test",
))
def test_plugin_found_not_found(conditional: str) -> None:
assert TemplateEngine().evaluate_conditional(TRUST.tag(conditional))
@pytest.mark.parametrize("value, expected", (
("{{ {'a': 1}.items() }}", [['a', 1]]),
("{{ {'a': 1}.keys() }}", ['a']),
("{{ {'a': 1}.values() }}", [1]),
("{{ yielder(2) }}", [0, 1]),
("{% set y = yielder(2) %}{{ y | list }} | {{ y | list }}", "[0, 1] | [0, 1]"),
), ids=str)
def test_finalize_generator(value: t.Any, expected: t.Any) -> None:
def yielder(count: int) -> t.Generator[int, None, None]:
yield from range(count)
templar = TemplateEngine(variables=dict(
yielder=yielder,
))
# DTFIX5: we still need to deal with the "Encountered unsupported" warnings these generate
assert templar.template(TRUST.tag(value)) == expected
@pytest.mark.parametrize("template", (
"{{ lookup('my_lookup', some_var) }}",
"{{ some_var | my_filter }}",
"{{ some_var is my_test }}",
))
def test_eager_trip_undefined(template: str, mocker: pytest_mock.MockerFixture) -> None:
"""Verify that eager tripping of Marker works for template plugins which only perform isinstance checks on undefined values."""
from ansible.plugins.lookup import LookupBase
class MyLookup(LookupBase):
def run(self, terms, variables=None, **kwargs):
return [isinstance(value, bool) for value in itertools.chain(*terms)]
def my_filter(values):
return [isinstance(value, bool) for value in values]
def my_test(values):
return all(isinstance(value, bool) for value in values)
def mock_lookup_get(*_args, **_kwargs) -> t.Any:
return MyLookup()
mock_lookup_loader = mocker.MagicMock()
mock_lookup_loader.get = mock_lookup_get
mocker.patch.object(_jinja_plugins, 'lookup_loader', mock_lookup_loader)
def get_templar(variables: dict[str, t.Any]) -> TemplateEngine:
new_templar = TemplateEngine(variables=variables)
new_templar.environment.filters['my_filter'] = my_filter
new_templar.environment.tests['my_test'] = my_test
return new_templar
template = TRUST.tag(template)
# verify the template works when some_var is defined
templar = get_templar(dict(some_var=[True]))
result = templar.template(template)
assert result is True or result == [True]
# verify the template raises AnsibleUndefinedVariable when some_var contains a template that references an undefined variable
templar = get_templar(dict(some_var=[TRUST.tag("{{ nope }}")]))
with pytest.raises(AnsibleUndefinedVariable) as ex:
templar.template(template)
assert ex.value.message == "'nope' is undefined"
def as_template(value: str) -> str:
return f"{{{{ {value} }}}}"
TEMPLATED_LOOKUP_NAME_TEST_VALUES = [
("""lookup('{{ "pipe" }}', 'echo hi')""", "hi"),
("""query('{{ "pipe" }}', 'echo hi')""", ["hi"]),
]
@pytest.mark.parametrize("value", [v[0] for v in TEMPLATED_LOOKUP_NAME_TEST_VALUES])
def test_lookup_query_name_is_not_templated_non_conditional(value: str) -> None:
with pytest.raises(AnsibleTemplatePluginNotFoundError):
TemplateEngine().template(TRUST.tag(as_template(value)))
@pytest.mark.parametrize("value", [v[0] for v in TEMPLATED_LOOKUP_NAME_TEST_VALUES])
def test_lookup_query_name_is_not_templated_conditional_nested_template(value: str, mocker: pytest_mock.MockerFixture) -> None:
mocker.patch.object(_TemplateConfig, 'allow_embedded_templates', True) # force this on since a number of cases need it
with pytest.raises(AnsibleTemplatePluginNotFoundError):
TemplateEngine().evaluate_conditional(TRUST.tag(as_template(value)))
@pytest.mark.parametrize("value, expected_result", TEMPLATED_LOOKUP_NAME_TEST_VALUES)
def test_lookup_query_name_is_not_templated_conditional_expression(value: str, expected_result: t.Any, mocker: pytest_mock.MockerFixture) -> None:
mocker.patch.object(_TemplateConfig, 'allow_embedded_templates', True) # force this on since a number of cases need it
with emits_warnings(warning_pattern="should not contain embedded templates"):
assert TemplateEngine().evaluate_conditional(TRUST.tag(f'{value} == {expected_result!r}'))
@pytest.mark.parametrize("value", [
"foo(",
"'a' == {{ 'b' }}",
])
def test_conditional_syntax_error(value: str) -> None:
with pytest.raises(AnsibleTemplateSyntaxError):
TemplateEngine().evaluate_conditional(TRUST.tag(value))
BROKEN_CONDITIONAL_VALUES = [
(None, True), # stupid backward-compat
("", True), # stupid backward-compat
("''", False),
("0", False),
("0.0", False),
("1", True),
("1.1", True),
("'abc'", True),
("{{ '' }}", True),
("{{ None }}", True),
("{{ 0 }}", False),
("{{ 0.0 }}", False),
("{{ [] }}", False),
("{{ {} }}", False),
([], False),
([TRUST.tag("{{ omit }}")], False),
({}, False),
(dict(a=TRUST.tag("{{ omit }}")), False),
(["abc", TRUST.tag("{{ omit }}")], True),
(dict(a="b", omitted=TRUST.tag("{{ omit }}")), True),
(0, False),
(0.0, False),
(1, True),
(1.1, True),
]
@pytest.mark.parametrize("value", [v[0] for v in BROKEN_CONDITIONAL_VALUES], ids=repr)
def test_broken_conditionals_disabled(value: t.Any, mocker: pytest_mock.MockerFixture) -> None:
mocker.patch.object(_TemplateConfig, 'allow_broken_conditionals', False)
with pytest.raises(AnsibleBrokenConditionalError):
TemplateEngine().evaluate_conditional(TRUST.tag(value))
@pytest.mark.parametrize("value, expected_result", BROKEN_CONDITIONAL_VALUES, ids=repr)
def test_broken_conditionals_enabled(value: t.Any, expected_result: bool, mocker: pytest_mock.MockerFixture) -> None:
mocker.patch.object(_TemplateConfig, 'allow_broken_conditionals', True)
mocker.patch.object(_TemplateConfig, 'allow_embedded_templates', True) # force this on since a number of cases need it
deprecation_matches = []
if isinstance(value, str) and is_possibly_all_template(value):
deprecation_matches.append("should not be surrounded")
if value in (None, '', "{{ '' }}", "{{ None }}"):
deprecation_matches.append("Empty conditional")
else:
deprecation_matches.append("must have a boolean result")
with emits_warnings(deprecation_pattern=deprecation_matches):
assert TemplateEngine().evaluate_conditional(TRUST.tag(value)) == expected_result
@pytest.mark.parametrize("template, expected", (
("1 == '{{ 1 }}'", False),
("lookup('items', '{{ [1, 2, 3] }}') == [1, 2, 3]", False),
("query('items', '{{ [1, 2, 3] }}') == [1, 2, 3]", False),
))
def test_embedded_templates_disabled(template: str, expected: t.Any, mocker: pytest_mock.MockerFixture) -> None:
mocker.patch.object(_TemplateConfig, 'allow_embedded_templates', False)
with emits_warnings(warning_pattern=[]):
assert TemplateEngine().evaluate_conditional(TRUST.tag(template)) == expected
@pytest.mark.parametrize("template, expected", (
("1 == '{{ 1 }}'", True),
("lookup('items', '{{ [1, 2, 3] }}') == [1, 2, 3]", True),
("query('items', '{{ [1, 2, 3] }}') == [1, 2, 3]", True),
))
def test_embedded_templates_enabled(template: str, expected: t.Any, mocker: pytest_mock.MockerFixture) -> None:
mocker.patch.object(_TemplateConfig, 'allow_embedded_templates', True)
templar = TemplateEngine()
with emits_warnings(warning_pattern="should not contain embedded templates"):
assert templar.evaluate_conditional(TRUST.tag(template)) == expected
# only lookup/query args support embedded templates in an actual template (not a naked expression)
if 'lookup' in template or 'query' in template:
with emits_warnings(warning_pattern="should not contain embedded templates"):
assert templar.evaluate_conditional(TRUST.tag(as_template(template))) == expected
def test_available_vars_smuggling():
"""
Jinja Template.render() and TemplateExpression.__call__() flatten their args/kwargs via splatting to dict(), which is an unnecessary copy as well as
causing top-level templated variables to be rendered prematurely. We have some arg smuggling code to prevent this, which this test validates.
"""
class ExplodingDict(dict):
"""A dict subclass that explodes when copied or iterated via `dict()`."""
def __iter__(self):
raise NotImplementedError()
def keys(self):
raise NotImplementedError()
template_vars = ExplodingDict()
# ensure our tripwire dict subclass fails when used as the source for a dict copy
with pytest.raises(NotImplementedError):
dict(template_vars)
# if Jinja copies our input dict, it should blow up; assert that it doesn't and that the template renders as expected
assert TemplateEngine(variables=template_vars).template(TRUST.tag("{{ 1 }}")) == 1
def test_template_var_isolation():
"""
Ensure that plugin mutations to lazy-wrapped container variables do not persist outside templating.
Direct mutation via Templar.available_variables is not currently protected (and is generally a bad idea).
"""
orig_dict_value = dict(one="one")
orig_list_value = [1, 2, 3]
available_vars = dict(dict_value=orig_dict_value, list_value=orig_list_value)
def mutate_my_vars(dict_value: dict, list_value: list) -> dict[str, t.Any]:
dict_value["added"] = "added"
list_value.append("added")
return dict(dict_value=dict_value, list_value=list_value)
res = TemplateEngine(variables=available_vars).evaluate_expression(
TRUST.tag("mutate_my_vars(dict_value, list_value)"),
local_variables=dict(mutate_my_vars=mutate_my_vars)
)
# ensure the plugin returned the mutated copies as expected
assert res == dict(dict_value=dict(one="one", added="added"), list_value=[1, 2, 3, "added"])
# ensure the original input variables are the same unmodified instances
assert available_vars == dict(dict_value=dict(one="one"), list_value=[1, 2, 3])
assert available_vars['dict_value'] is orig_dict_value
assert available_vars['list_value'] is orig_list_value
@pytest.mark.parametrize('fixture, plugin_type, plugin_name, expected', (
('no_collections', 'filter', 'invalid/name.does_not_matter.also_does_not_matter', AnsibleTemplateSyntaxError), # plugin is None
('no_collections', 'lookup', 'invalid/name.does_not_matter.also_does_not_matter', AnsibleTemplatePluginNotFoundError),
('no_collections', 'filter', 'missing_namespace_name.does_not_matter.also_does_not_matter', AnsibleTemplateSyntaxError), # KeyError
('no_collections', 'lookup', 'missing_namespace_name.does_not_matter.also_does_not_matter', AnsibleTemplatePluginNotFoundError),
('valid_collection', 'filter', 'valid.invalid/name.does_not_matter', AnsibleTemplateSyntaxError), # KeyError
('valid_collection', 'lookup', 'valid.invalid/name.does_not_matter', AnsibleTemplatePluginNotFoundError),
('valid_collection', 'filter', 'valid.missing_collection.does_not_matter', AnsibleTemplateSyntaxError), # KeyError
('valid_collection', 'lookup', 'valid.missing_collection.does_not_matter', AnsibleTemplatePluginNotFoundError),
('valid_collection', 'filter', 'valid.also_valid.invalid/name', AnsibleTemplateSyntaxError), # plugin is None
('valid_collection', 'lookup', 'valid.also_valid.invalid/name', AnsibleTemplatePluginNotFoundError),
('valid_collection', 'filter', 'valid.also_valid.missing_plugin', AnsibleTemplateSyntaxError), # plugin is None
('valid_collection', 'lookup', 'valid.also_valid.missing_plugin', AnsibleTemplatePluginNotFoundError),
('valid_collection', 'filter', 'valid.also_valid.also_also_valid', []),
('valid_collection', 'lookup', 'valid.also_valid.also_also_valid', []),
('valid_collection', 'filter', 'valid.also_valid.runtime_error', AnsibleTemplatePluginRuntimeError),
('valid_collection', 'lookup', 'valid.also_valid.runtime_error', AnsibleTemplatePluginRuntimeError),
('valid_collection', 'filter', 'valid.also_valid.load_error', AnsibleTemplatePluginLoadError), # AnsibleError
('valid_collection', 'lookup', 'valid.also_valid.load_error', AnsibleTemplatePluginLoadError),
('valid_collection', 'template', '{% if false %}{{ 123 | valid.also_valid.load_error }}{% else %}Success{% endif %}', AnsibleTemplatePluginLoadError),
('no_collections', 'filter', 'ansible.invalid/name.does_not_matter', AnsibleTemplateSyntaxError), # KeyError
('no_collections', 'lookup', 'ansible.invalid/name.does_not_matter', AnsibleTemplatePluginNotFoundError),
('no_collections', 'filter', 'ansible.missing_collection.does_not_matter', AnsibleTemplateSyntaxError), # KeyError
('no_collections', 'lookup', 'ansible.missing_collection.does_not_matter', AnsibleTemplatePluginNotFoundError),
('no_collections', 'filter', 'ansible.builtin.invalid/name', AnsibleTemplateSyntaxError), # plugin is None
('no_collections', 'lookup', 'ansible.builtin.invalid/name', AnsibleTemplatePluginNotFoundError),
('no_collections', 'filter', 'ansible.builtin.missing_plugin', AnsibleTemplateSyntaxError), # plugin is None
('no_collections', 'lookup', 'ansible.builtin.missing_plugin', AnsibleTemplatePluginNotFoundError),
('no_collections', 'filter', 'ansible.builtin.quote', 'foo'),
('no_collections', 'lookup', 'ansible.builtin.env', []),
('no_collections', 'template', '{% if false %}{{ 123 | ansible.builtin.missing_plugin }}{% else %}Success{% endif %}', 'Success'),
('no_collections', 'filter', 'invalid/name', AnsibleTemplateSyntaxError), # plugin is None
('no_collections', 'lookup', 'invalid/name', AnsibleTemplatePluginNotFoundError),
('no_collections', 'filter', 'missing_plugin', AnsibleTemplateSyntaxError), # plugin is None
('no_collections', 'lookup', 'missing_plugin', AnsibleTemplatePluginNotFoundError),
('no_collections', 'filter', 'quote', 'foo'),
('no_collections', 'lookup', 'env', []),
), ids=str)
def test_jinja2_loader_plugin(fixture: str, plugin_type: str, plugin_name: str, expected: t.Any) -> None:
if plugin_type == 'filter':
expression = f'{{{{ "foo" | {plugin_name} }}}}'
elif plugin_type == 'template':
expression = plugin_name # abusing plugin_type and plugin_name to allow for testing arbitrary expressions
else:
expression = f'{{{{ lookup("{plugin_name}") }}}}'
# HACK: this test should really be using a shared collection loader fixture, but this fixes an "inherited" dummy collection loader
from ansible.utils.collection_loader._collection_finder import _AnsibleCollectionFinder
try:
_AnsibleCollectionFinder._remove()
nuke_module_prefix('ansible_collections')
except Exception:
pass
_AnsibleCollectionFinder(paths=[str(pathlib.Path(__file__).parent / 'fixtures' / fixture)])._install()
try:
if isinstance(expected, type) and issubclass(expected, Exception):
with pytest.raises(expected):
TemplateEngine().template(TRUST.tag(expression))
else:
assert TemplateEngine().template(TRUST.tag(expression)) == expected
finally:
_AnsibleCollectionFinder._remove()
nuke_module_prefix('ansible_collections')
def test_variable_name_as_template_success() -> None:
name = origin.tag("blar")
res = TemplateEngine().variable_name_as_template(name)
assert res.replace(' ', '') == "{{blar}}"
required_tags: frozenset[AnsibleDatatagBase] = frozenset({origin, TrustedAsTemplate()})
assert required_tags - AnsibleTagHelper.tags(res) == set() # there might be others, that's fine
def test_variable_name_as_template_invalid() -> None:
invalid_name = origin.tag(" invalid[var*name")
with pytest.raises(AnsibleError) as err:
TemplateEngine().variable_name_as_template(invalid_name)
assert err.value.obj is invalid_name
@pytest.mark.parametrize("expression, expected", (
("dictthing.subdict1.subdict2", "hi mom"),
("dictthing.sublist1[0]", 1),
("dictthing[keys.sublist1][0]", 1),
("123", 123),
))
def test_resolve_variable_expression_success(expression: str, expected: t.Any) -> None:
templar = TemplateEngine()
local_variables = dict(
dictthing=dict(sublist1=[1, 2, 3], subdict1=dict(subdict2="hi mom")),
keys=dict(sublist1="sublist1")
)
assert templar.resolve_variable_expression(expression, local_variables=local_variables) == expected
@pytest.mark.parametrize("expression", (
"'text'",
"dictthing['subdict1']",
"q('env')",
))
def test_resolve_variable_expression_invalid(expression: str) -> None:
templar = TemplateEngine()
with pytest.raises(AnsibleError) as err:
templar.resolve_variable_expression(expression, local_variables=dict(dictthing=dict(subdict1=1)))
assert err.value.obj is expression
def test_resolve_variable_expression_missing() -> None:
templar = TemplateEngine()
expr = origin.tag("missing_variable")
with pytest.raises(AnsibleUndefinedVariable) as err:
templar.resolve_variable_expression(expr)
assert err.value.obj == expr # may not be the same instance, since it was tagged TrustedAsTemplate internally
required_tags: frozenset[AnsibleDatatagBase] = frozenset({origin, TrustedAsTemplate()})
assert required_tags - AnsibleTagHelper.tags(err.value.obj) == set() # there might be others, that's fine
def test_error_invalid_non_string_template():
"""Ensure errors on non-string template inputs include type information."""
with pytest.raises(AnsibleTemplateError) as err:
TemplateEngine().template(...)
assert f"Type {type(...).__name__!r} is unsupported for variable storage." in err.value.message
def nuke_module_prefix(prefix):
for module_to_nuke in [m for m in sys.modules if m.startswith(prefix)]:
sys.modules.pop(module_to_nuke)
def test_template_transform_limit_exceeded(mocker: pytest_mock.MockerFixture) -> None:
"""
Verify that template transforms cannot trigger an infinite loop.
This currently requires injecting bogus transforms to trigger the condition, but the logic is present to catch future coding errors.
"""
class One:
def __init__(self, *args, **kwargs):
pass
class Two:
def __init__(self, *args, **kwargs):
pass
mocker.patch.dict(_transform._type_transform_mapping, {One: Two, Two: One})
with pytest.raises(AnsibleTemplateTransformLimitError):
TemplateEngine(variables=dict(limit=One())).template(TRUST.tag("{{ limit }}"))
def test_transform_transform_limit_exceeded(mocker: pytest_mock.MockerFixture) -> None:
"""
Verify that standalone recursive transforms cannot trigger an infinite loop.
This currently requires injecting bogus transforms to trigger the condition, but the logic is present to catch future coding errors.
"""
class One:
def __init__(self, *args, **kwargs):
pass
class Two:
def __init__(self, *args, **kwargs):
pass
mocker.patch.dict(_transform._type_transform_mapping, {One: Two, Two: One})
with pytest.raises(AnsibleTemplateTransformLimitError):
TemplateEngine().transform(One())
def test_deprecated_dedupe_and_source():
"""Validate dedupe and source context behavior for deprecated item access and associated warning behavior."""
# unique tag instances that share the same contents (can be tracked independently by the audit context)
deprecated_string = Deprecated(msg="deprecated").tag("deprecated string")
deprecated_list = Deprecated(msg="deprecated").tag([42])
deprecated_dict = Deprecated(msg="deprecated").tag(dict(key="value"))
# a shared tag instance (cannot be tracked independently by the audit context)
shared_tag_instance = Deprecated(msg="shared tag")
d1 = shared_tag_instance.tag("d1")
d2 = shared_tag_instance.tag("d2")
variables = dict(
indirect1=TRUST.tag('{{ indirect2 }}'),
indirect2=TRUST.tag('{{ deprecated_string }}'),
deprecated_string=deprecated_string,
deprecated_list=deprecated_list,
deprecated_dict=deprecated_dict,
d1=d1,
d2=d2,
ansible_deprecation_warnings=True,
)
templar = TemplateEngine(variables=variables)
with _DeferredWarningContext(variables=variables) as dwc:
# The indirect access summary occurs first.
# The two following direct access summaries get deduped to a single one by the warning context (but unique template value keeps distinct from indirect).
# The accesses with the shared tag instance values are internally deduped by the audit context.
templar.evaluate_expression(TRUST.tag("indirect1 and deprecated_list and deprecated_dict and d1 and d2"))
dep_warnings = dwc.get_deprecation_warnings()
assert len(dep_warnings) == 3
assert 'deprecated_string' in _event_utils.format_event_brief_message(dep_warnings[0].event)
assert 'indirect1 and deprecated_list and deprecated_dict' in _event_utils.format_event_brief_message(dep_warnings[1].event)
assert 'd1 and d2' in _event_utils.format_event_brief_message(dep_warnings[2].event)
def test_jinja_const_template_leak(template_context: TemplateContext) -> None:
"""Verify that _JinjaConstTemplate is present during internal templating."""
with _DeferredWarningContext(variables={}): # suppress warning from usage of embedded template
with unittest.mock.patch.object(_TemplateConfig, 'allow_embedded_templates', True):
assert _JinjaConstTemplate.is_tagged_on(TemplateEngine().template(TRUST.tag("{{ '{{ 1 }}' }}")))
def test_jinja_const_template_finalized() -> None:
"""Verify that _JinjaConstTemplate is not present in finalized template results."""
with _DeferredWarningContext(variables={}): # suppress warning from usage of embedded template
with unittest.mock.patch.object(_TemplateConfig, 'allow_embedded_templates', True):
assert not _JinjaConstTemplate.is_tagged_on(TemplateEngine().template(TRUST.tag("{{ '{{ 1 }}' }}")))
@pytest.mark.parametrize("template,expected", (
("{% set x=[] %}{% set _=x.append(42) %}{{ x }}", [42]),
("{{ (32).__or__(64) }}", 96),
("{% set x={'foo': 42} %}{% set _=x.clear() %}{{ x }}", {}),
))
def test_unsafe_attr_access(template: str, expected: object) -> None:
"""Verify that unsafe attribute access fails by default and works when explicitly configured."""
assert _TemplateConfig.sandbox_mode == _SandboxMode.DEFAULT
with pytest.raises(AnsibleUndefinedVariable):
TemplateEngine().template(TRUST.tag(template))
with unittest.mock.patch.object(_TemplateConfig, 'sandbox_mode', _SandboxMode.ALLOW_UNSAFE_ATTRIBUTES):
assert TemplateEngine().template(TRUST.tag(template)) == expected
def test_marker_from_test_plugin() -> None:
"""Verify test plugins can raise MarkerError to return a Marker, and that no warnings or deprecations are emitted."""
with emits_warnings(deprecation_pattern=[], warning_pattern=[]):
assert TemplateEngine(variables=dict(something=TRUST.tag("{{ nope }}"))).template(TRUST.tag("{{ (something is eq {}) is undefined }}"))