diff --git a/changelogs/fragments/truthy_tests.yml b/changelogs/fragments/truthy_tests.yml new file mode 100644 index 00000000000..bd613e32bca --- /dev/null +++ b/changelogs/fragments/truthy_tests.yml @@ -0,0 +1,3 @@ +bugfixes: + - Core Jinja test plugins - Builtin test plugins now always return ``bool`` to avoid spurious deprecation warnings for + some malformed inputs. diff --git a/lib/ansible/plugins/test/core.py b/lib/ansible/plugins/test/core.py index 2819cd14367..e29e8c24bcb 100644 --- a/lib/ansible/plugins/test/core.py +++ b/lib/ansible/plugins/test/core.py @@ -49,37 +49,41 @@ def timedout(result): """ Test if task result yields a time out""" if not isinstance(result, MutableMapping): raise errors.AnsibleFilterError("The 'timedout' test expects a dictionary") - return result.get('timedout', False) and bool(result['timedout'].get('period', False)) + + return bool(result.get('timedout') and bool(result['timedout'].get('period'))) def failed(result): """ Test if task result yields failed """ if not isinstance(result, MutableMapping): raise errors.AnsibleFilterError("The 'failed' test expects a dictionary") - return result.get('failed', False) + + return bool(result.get('failed')) def success(result): """ Test if task result yields success """ - return not failed(result) + return not bool(failed(result)) def unreachable(result): """ Test if task result yields unreachable """ if not isinstance(result, MutableMapping): raise errors.AnsibleFilterError("The 'unreachable' test expects a dictionary") - return result.get('unreachable', False) + + return bool(result.get('unreachable')) def reachable(result): """ Test if task result yields reachable """ - return not unreachable(result) + return bool(not unreachable(result)) def changed(result): """ Test if task result yields changed """ if not isinstance(result, MutableMapping): raise errors.AnsibleFilterError("The 'changed' test expects a dictionary") + if 'changed' not in result: changed = False if ( @@ -88,29 +92,32 @@ def changed(result): isinstance(result['results'][0], MutableMapping) ): for res in result['results']: - if res.get('changed', False): + if res.get('changed'): changed = True break else: - changed = result.get('changed', False) - return changed + changed = result.get('changed') + + return bool(changed) def skipped(result): """ Test if task result yields skipped """ if not isinstance(result, MutableMapping): raise errors.AnsibleFilterError("The 'skipped' test expects a dictionary") - return result.get('skipped', False) + + return bool(result.get('skipped')) def started(result): """ Test if async task has started """ if not isinstance(result, MutableMapping): raise errors.AnsibleFilterError("The 'started' test expects a dictionary") + if 'started' in result: # For async tasks, return status # NOTE: The value of started is 0 or 1, not False or True :-/ - return result.get('started', 0) == 1 + return bool(result.get('started')) else: # For non-async tasks, warn user, but return as if started display.warning("The 'started' test expects an async task, but a non-async task was tested") @@ -121,10 +128,11 @@ def finished(result): """ Test if async task has finished """ if not isinstance(result, MutableMapping): raise errors.AnsibleFilterError("The 'finished' test expects a dictionary") + if 'finished' in result: # For async tasks, return status # NOTE: The value of finished is 0 or 1, not False or True :-/ - return result.get('finished', 0) == 1 + return bool(result.get('finished')) else: # For non-async tasks, warn user, but return as if finished display.warning("The 'finished' test expects an async task, but a non-async task was tested") diff --git a/lib/ansible/plugins/test/uri.py b/lib/ansible/plugins/test/uri.py index 1eaaf80ac90..8273801c03f 100644 --- a/lib/ansible/plugins/test/uri.py +++ b/lib/ansible/plugins/test/uri.py @@ -20,11 +20,8 @@ def is_url(value, schemes=None): isit = is_uri(value, schemes) if isit: - try: - x = urlparse(value) - isit = bool(x.netloc or x.scheme == 'file') - except Exception as e: - isit = False + x = urlparse(value) + isit = bool(x.netloc or x.scheme == 'file') return isit diff --git a/test/units/plugins/test/dummy_vault.txt b/test/units/plugins/test/dummy_vault.txt new file mode 100644 index 00000000000..73263f727c0 --- /dev/null +++ b/test/units/plugins/test/dummy_vault.txt @@ -0,0 +1 @@ +$ANSIBLE_VAULT;1.1;BLA diff --git a/test/units/plugins/test/test_all.py b/test/units/plugins/test/test_all.py new file mode 100644 index 00000000000..750a4eb08ac --- /dev/null +++ b/test/units/plugins/test/test_all.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +import collections +import dataclasses +import math +import pathlib +import tempfile +import typing as t + +import pytest + +from ansible.parsing.vault import EncryptedString +from ansible.plugins.loader import test_loader +from ansible.plugins.test import AnsibleJinja2Test +from ansible.template import Templar, trust_as_template +from units.test_utils.controller.display import emits_warnings + + +@dataclasses.dataclass +class Extra: + variables: dict[str, t.Any] | None = None + args: list[t.Any] | None = None + kwargs: dict[str, t.Any] | None = None + func: t.Callable[[Extra], None] | None = None + + +class MakeLink: + _tempdir: tempfile.TemporaryDirectory[str] | None = None + + def __call__(self, *args, **kwargs) -> str: + self._tempdir = tempfile.TemporaryDirectory() + + symlink = pathlib.Path(self._tempdir.name) / 'a_symlink' + symlink.symlink_to('something') + + return str(symlink) + + def __del__(self) -> None: + if self._tempdir: + self._tempdir.cleanup() + + def __repr__(self) -> str: + return 'MakeLink' + + +TEST_DATA_SET: tuple[tuple[t.Any, str, bool, Extra | None], ...] = ( + # core + (dict(failed=1), 'failed', True, None), + (dict(failed=0), 'failed', False, None), + (dict(), 'failed', False, None), + (dict(failed=1), 'success', False, None), + (dict(failed=0), 'success', True, None), + (dict(), 'success', True, None), + (dict(unreachable=1), 'reachable', False, None), + (dict(unreachable=0), 'reachable', True, None), + (dict(), 'reachable', True, None), + (dict(unreachable=0), 'unreachable', False, None), + (dict(unreachable=1), 'unreachable', True, None), + (dict(), 'unreachable', False, None), + (dict(timedout=dict(period=99)), 'timedout', True, None), + # (dict(timedout=1), 'timedout', False, None), # oops, bug + (dict(timedout=0), 'timedout', False, None), + (dict(), 'timedout', False, None), + (dict(changed=1), 'changed', True, None), + (dict(changed=0), 'changed', False, None), + (dict(), 'changed', False, None), + # (dict(results=[]), 'changed', True, None), # oops, bug + (dict(results=[dict(changed=1)]), 'changed', True, None), + (dict(results=[dict(changed=0)]), 'changed', False, None), + (dict(), 'changed', False, None), + (dict(skipped=1), 'skipped', True, None), + (dict(skipped=0), 'skipped', False, None), + (dict(), 'skipped', False, None), + (dict(finished=1), 'finished', True, None), + (dict(finished=0), 'finished', False, None), + (dict(), 'finished', True, None), + (dict(started=1), 'started', True, None), + (dict(started=0), 'started', False, None), + (dict(), 'started', True, None), + ('"foo"', 'match', True, Extra(args=['"foo"'])), + ('"foo"', 'match', False, Extra(args=['"bar"'])), + ('"xxfooxx"', 'search', True, Extra(args=['"foo"'])), + ('"xxfooxx"', 'search', False, Extra(args=['"bar"'])), + ('"fooxx"', 'regex', True, Extra(args=['"FOO"'], kwargs=dict(ignorecase=True, multiline=True, match_type='"match"'))), + ('"fooxx"', 'regex', False, Extra(args=['"BAR"'], kwargs=dict(ignorecase=True, multiline=True, match_type='"match"'))), + ('1.1', 'version_compare', True, Extra(args=['1.1', '"eq"'])), + ('1.1', 'version_compare', False, Extra(args=['1.0', '"eq"'])), + ([0], 'any', False, None), + ([1], 'any', True, None), + ([0], 'all', False, None), + ([1], 'all', True, None), + (1, 'truthy', True, None), + (0, 'truthy', False, None), + (1, 'falsy', False, None), + (0, 'falsy', True, None), + ('foo', 'vault_encrypted', True, Extra(variables=dict(foo=EncryptedString(ciphertext='$ANSIBLE_VAULT;1.1;BLAH')))), + ('foo', 'vault_encrypted', False, Extra(variables=dict(foo='not_encrypted'))), + (repr(str(pathlib.Path(__file__).parent / "dummy_vault.txt")), 'vaulted_file', True, None), + (repr(__file__), 'vaulted_file', False, None), + ('q', 'defined', True, None), + ('not_defined', 'defined', False, None), + ('q', 'undefined', False, None), + ('not_defined', 'undefined', True, None), + # files + ('"/"', 'directory', True, None), + (repr(__file__), 'directory', False, None), + (repr(__file__), 'file', True, None), + ('"/"', 'file', False, None), + ('make_link()', 'link', True, Extra(variables=dict(make_link=MakeLink()))), + ('"/"', 'link', False, None), + ('"/"', 'exists', True, None), + ('"/does_not_exist"', 'exists', False, None), + ('"/"', 'link_exists', True, None), + ('"/does_not_exist"', 'link_exists', False, None), + ('"/absolute"', 'abs', True, None), + ('"relative"', 'abs', False, None), + ('"/"', 'same_file', True, Extra(args=['"/"'])), + (repr(__file__), 'same_file', False, Extra(args=['"/"'])), + ('"/"', 'mount', True, None), + ('"/not_a_mount_point"', 'mount', False, None), + # mathstuff + ([1], 'subset', True, Extra(args=[[1]])), + ([0], 'subset', False, Extra(args=[[1]])), + ([1], 'superset', True, Extra(args=[[1]])), + ([0], 'superset', False, Extra(args=[[1]])), + ([0], 'contains', True, Extra(args=[0])), + ([1], 'contains', False, Extra(args=[0])), + ('nan', 'nan', True, Extra(variables=dict(nan=math.nan))), + ('"a string"', 'nan', False, None), + # uri + ('"https://ansible.com/"', 'uri', True, None), + (1, 'uri', False, None), + ('"https://ansible.com/"', 'url', True, None), + (1, 'url', False, None), + ('"urn:https://ansible.com/"', 'urn', True, None), + (1, 'urn', False, None), +) + + +@pytest.mark.parametrize("value,test,expected,extra", TEST_DATA_SET, ids=str) +def test_truthy_inputs(value: object, test: str, expected: bool, extra: Extra | None) -> None: + """Ensure test plugins return the expected bool result, not just a truthy/falsey value.""" + test_invocation = test + + if extra: + test_args = extra.args or [] + test_args.extend(f'{k}={v}' for k, v in (extra.kwargs or {}).items()) + test_invocation += '(' + ', '.join(str(arg) for arg in test_args) + ')' + + expression = f'{value} is {test_invocation}' + + with emits_warnings(deprecation_pattern=[]): + result = Templar(variables=extra.variables if extra else None).evaluate_expression(trust_as_template(expression)) + + assert result is expected + + +def test_ensure_all_plugins_tested() -> None: + """Ensure all plugins have at least one entry in the test data set, accounting for functions which have multiple names.""" + test_plugins: list[AnsibleJinja2Test] = [plugin for plugin in test_loader.all() if plugin.ansible_name.startswith('ansible.builtin.')] + plugin_aliases: dict[t.Any, set[str]] = collections.defaultdict(set) + + for test_plugin in test_plugins: + plugin_aliases[test_plugin.j2_function].add(test_plugin.ansible_name) + + missing_entries: list[str] = [] + + for plugin_names in plugin_aliases.values(): + matching_tests = {_expected for _value, test, _expected, _extra in TEST_DATA_SET if f'ansible.builtin.{test}' in plugin_names} + missing = {True, False} - matching_tests + + if missing: # pragma: nocover + missing_entries.append(f'{plugin_names}: {missing}') + + assert not missing_entries