From 9a426fe3030d7777c3291f2853dcd2d1e51cae75 Mon Sep 17 00:00:00 2001 From: Matt Davis <6775756+nitzmahone@users.noreply.github.com> Date: Fri, 9 May 2025 22:05:56 -0700 Subject: [PATCH] fix ensure_type to support vaulted values (#85129) * restored parity with 2.18 Co-authored-by: Matt Clay --- lib/ansible/config/manager.py | 11 +++++ .../utils/collection_loader/__init__.py | 2 + test/units/config/test_manager.py | 16 ++++++- test/units/controller_only_conftest.py | 9 ++-- test/units/mock/vault_helper.py | 32 +++++++------ test/units/parsing/vault/test_vault.py | 48 +++++++------------ test/units/parsing/yaml/test_vault.py | 10 ++-- 7 files changed, 73 insertions(+), 55 deletions(-) diff --git a/lib/ansible/config/manager.py b/lib/ansible/config/manager.py index e255f2dc9f0..f050ced4d75 100644 --- a/lib/ansible/config/manager.py +++ b/lib/ansible/config/manager.py @@ -51,6 +51,14 @@ GALAXY_SERVER_ADDITIONAL = { } +@t.runtime_checkable +class _EncryptedStringProtocol(t.Protocol): + """Protocol representing an `EncryptedString`, since it cannot be imported here.""" + # DTFIX-FUTURE: collapse this with the one in collection loader, once we can + + def _decrypt(self) -> str: ... + + def _get_config_label(plugin_type: str, plugin_name: str, config: str) -> str: """Return a label for the given config.""" entry = f'{config!r}' @@ -205,6 +213,9 @@ def _ensure_type(value: object, value_type: str | None, origin: str | None = Non if isinstance(value, (bool, int, float, complex)): return str(value) + if isinstance(value, _EncryptedStringProtocol): + return value._decrypt() + case _: # FIXME: define and document a pass-through value_type (None, 'raw', 'object', '', ...) and then deprecate acceptance of unknown types return value # return non-str values of unknown value_type as-is diff --git a/lib/ansible/utils/collection_loader/__init__.py b/lib/ansible/utils/collection_loader/__init__.py index 2009946e229..89d75a1dcd9 100644 --- a/lib/ansible/utils/collection_loader/__init__.py +++ b/lib/ansible/utils/collection_loader/__init__.py @@ -13,6 +13,8 @@ import typing as t class _EncryptedStringProtocol(t.Protocol): """Protocol representing an `EncryptedString`, since it cannot be imported here.""" + # DTFIX-FUTURE: collapse this with the one in config, once we can + def _decrypt(self) -> str: ... diff --git a/test/units/config/test_manager.py b/test/units/config/test_manager.py index fb2d238ad81..65ec9c0c9a1 100644 --- a/test/units/config/test_manager.py +++ b/test/units/config/test_manager.py @@ -14,8 +14,9 @@ import pytest from ansible.config.manager import ConfigManager, ensure_type, resolve_path, get_config_type from ansible.errors import AnsibleOptionsError, AnsibleError -from ansible._internal._datatag._tags import Origin +from ansible._internal._datatag._tags import Origin, VaultedValue from ansible.module_utils._internal._datatag import AnsibleTagHelper +from units.mock.vault_helper import VaultTestHelper curdir = os.path.dirname(__file__) cfg_file = os.path.join(curdir, 'test.cfg') @@ -198,6 +199,19 @@ def test_ensure_type_temppath(value: object, type: str, tmp_path: pathlib.Path) assert os.listdir(path) == [] +def test_ensure_type_vaulted(_vault_secrets_context: VaultTestHelper) -> None: + raw = "secretvalue" + origin = Origin(description='test') + es = _vault_secrets_context.make_encrypted_string(raw) + es = origin.tag(es) + result = ensure_type(es, 'str') + + assert isinstance(result, str) + assert result == raw + assert VaultedValue.is_tagged_on(result) + assert Origin.get_tag(result) is origin + + class TestConfigManager: @classmethod def setup_class(cls): diff --git a/test/units/controller_only_conftest.py b/test/units/controller_only_conftest.py index d32780125b9..f7204892e37 100644 --- a/test/units/controller_only_conftest.py +++ b/test/units/controller_only_conftest.py @@ -3,11 +3,14 @@ from __future__ import annotations import typing as t import pytest + from pytest_mock import MockerFixture from ansible._internal._errors._handler import ErrorHandler, ErrorAction from ansible.parsing.vault import VaultSecretsContext, VaultSecret from ansible._internal._templating._engine import _TemplateConfig +from units.mock.vault_helper import VaultTestHelper + # DTFIX-RELEASE: it'd be nice not to need to sync all controller-only fixtures here, but sunder values are excluded from `import *`, # and if we don't sunder the fixtures, they'll be unused args. Could also be handled with a managed import, module getattr, or others. @@ -36,13 +39,13 @@ def _empty_vault_secrets_context(_zap_vault_secrets_context) -> t.Generator[None @pytest.fixture -def _vault_secrets_context(_zap_vault_secrets_context) -> t.Generator[None]: - """A fixture that provides a `VaultSecretsContext` populated with a single default secret under the default id.""" +def _vault_secrets_context(_zap_vault_secrets_context) -> t.Generator[VaultTestHelper]: + """A fixture that initializes `VaultSecretsContext` with a single secret under the default ID and returns a `VaultTestHelper`.""" secret = VaultSecret(b'secretbytesblah') VaultSecretsContext.initialize(VaultSecretsContext(secrets=[('default', secret)])) - yield + yield VaultTestHelper() @pytest.fixture diff --git a/test/units/mock/vault_helper.py b/test/units/mock/vault_helper.py index c145413727f..dc2205a03a9 100644 --- a/test/units/mock/vault_helper.py +++ b/test/units/mock/vault_helper.py @@ -1,21 +1,8 @@ -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see . - from __future__ import annotations +from ansible._internal._datatag import _tags from ansible.module_utils.common.text.converters import to_bytes - -from ansible.parsing.vault import VaultSecret +from ansible.parsing.vault import VaultSecret, VaultSecretsContext, VaultLib, EncryptedString class TextVaultSecret(VaultSecret): @@ -35,3 +22,18 @@ class TextVaultSecret(VaultSecret): def bytes(self): """The text encoded with encoding, unless we specifically set _bytes.""" return self._bytes or to_bytes(self.text, encoding=self.encoding, errors=self.errors) + + +class VaultTestHelper: + @classmethod + def make_vault_ciphertext(cls, plaintext: str) -> str: + """Creates an `EncryptedString` from the first secret in the active VaultSecretsContext.""" + secrets = VaultSecretsContext.current().secrets + vl = VaultLib(secrets) + + return vl.encrypt(plaintext, secrets[0][1]).decode() + + @classmethod + def make_encrypted_string(cls, plaintext: str) -> EncryptedString: + """Creates an `EncryptedString` from the first secret in the active VaultSecretsContext.""" + return _tags.Origin(path="/tmp/sometest", line_num=42, col_num=42).tag(EncryptedString(ciphertext=cls.make_vault_ciphertext(plaintext))) diff --git a/test/units/parsing/vault/test_vault.py b/test/units/parsing/vault/test_vault.py index bddf08dfbe9..3f612ba912e 100644 --- a/test/units/parsing/vault/test_vault.py +++ b/test/units/parsing/vault/test_vault.py @@ -36,7 +36,7 @@ from ansible import errors from ansible.module_utils.common.text.converters import to_bytes, to_text from ansible.module_utils._internal._datatag import AnsibleTagHelper from ansible.parsing import vault -from ansible.parsing.vault import EncryptedString, VaultSecretsContext, VaultLib, AnsibleVaultError, VaultHelper +from ansible.parsing.vault import EncryptedString, VaultSecretsContext, AnsibleVaultError, VaultHelper from ansible._internal._templating._jinja_common import VaultExceptionMarker, TruncationMarker, Marker from ansible._internal._templating._engine import TemplateEngine, TemplateOptions from ansible._internal._templating._utils import TemplateContext @@ -44,7 +44,7 @@ from ansible._internal._datatag._tags import VaultedValue, Origin, TrustedAsTemp from ansible.utils.collection_loader import _EncryptedStringProtocol from units.mock.loader import DictDataLoader -from units.mock.vault_helper import TextVaultSecret +from units.mock.vault_helper import TextVaultSecret, VaultTestHelper class TestUnhexlify(unittest.TestCase): @@ -876,28 +876,14 @@ def test_vault_secrets_context_already_initialized(_zap_vault_secrets_context) - VaultSecretsContext.initialize(VaultSecretsContext([])) -def make_vault_ciphertext(plaintext: str) -> str: - """Creates an `EncryptedString` from the first secret in the active VaultSecretsContext.""" - secrets = VaultSecretsContext.current().secrets - vl = VaultLib(secrets) - - return vl.encrypt(plaintext, secrets[0][1]).decode() - - -def make_encrypted_string(plaintext: str) -> EncryptedString: - """Creates an `EncryptedString` from the first secret in the active VaultSecretsContext.""" - - return Origin(path="/tmp/sometest", line_num=42, col_num=42).tag(EncryptedString(ciphertext=make_vault_ciphertext(plaintext))) - - -def test_encrypted_string_unmanaged_access(_vault_secrets_context) -> None: +def test_encrypted_string_unmanaged_access(_vault_secrets_context: VaultTestHelper) -> None: """ Validates that unmanaged access to an `EncryptedString`: * properly decrypts and caches the value when secrets are available * propagates Origin and VaultedValue tags """ plaintext = 'i am plaintext' - encrypted_string = make_encrypted_string(plaintext) + encrypted_string = _vault_secrets_context.make_encrypted_string(plaintext) origin = Origin.get_required_tag(encrypted_string) vaulted_value_tag = VaultedValue(ciphertext=VaultHelper.get_ciphertext(encrypted_string, with_tags=False)) @@ -912,9 +898,9 @@ def test_encrypted_string_unmanaged_access(_vault_secrets_context) -> None: assert VaultedValue.get_required_tag(res1).ciphertext == vaulted_value_tag.ciphertext -def test_encrypted_string_unmanaged_access_fail(_vault_secrets_context) -> None: +def test_encrypted_string_unmanaged_access_fail(_vault_secrets_context: VaultTestHelper) -> None: """Validates that unmanaged access to an `EncryptedString` fails with AnsibleVaultError when secrets are not available.""" - encrypted_string = make_encrypted_string("i am plaintext") + encrypted_string = _vault_secrets_context.make_encrypted_string("i am plaintext") VaultSecretsContext.current().secrets = [] with pytest.raises(AnsibleVaultError): @@ -927,18 +913,18 @@ def test_encrypted_string_unmanaged_access_fail(_vault_secrets_context) -> None: (42, int), (42.42, float), )) -def test_encrypted_string_conversion_methods(value: t.Any, conversion_func: t.Callable, _vault_secrets_context): +def test_encrypted_string_conversion_methods(value: t.Any, conversion_func: t.Callable, _vault_secrets_context: VaultTestHelper): """Ensure that `EncryptedString` dunder conversion methods decrypt and pass through correctly.""" - encrypted_string = make_encrypted_string(str(value)) + encrypted_string = _vault_secrets_context.make_encrypted_string(str(value)) converted = conversion_func(encrypted_string) assert converted == value -def test_radd(_vault_secrets_context) -> None: +def test_radd(_vault_secrets_context: VaultTestHelper) -> None: """Ensure that the __radd__ dunder method decrypts and passes through.""" - assert "plain string " + make_encrypted_string("secret string") == "plain string secret string" + assert "plain string " + _vault_secrets_context.make_encrypted_string("secret string") == "plain string secret string" def make_marker(marker_type: type[Marker], *args, **kwargs): @@ -979,7 +965,7 @@ def test_vaulthelper_get_ciphertext(value: t.Any, expected_ciphertext: str | Non ("os.listdir(ed)", "[temp_file.name]"), ("open(ef).read()", "'Ansible'"), )) -def test_encrypted_string_path_fspath(_vault_secrets_context, expression: str, expected_expression: str) -> None: +def test_encrypted_string_path_fspath(_vault_secrets_context: VaultTestHelper, expression: str, expected_expression: str) -> None: """Ensure that `EncryptedString` works with `PathLike` duck-typing consumers.""" with tempfile.TemporaryDirectory() as temp_dir_path: temp_dir = pathlib.Path(temp_dir_path) @@ -989,10 +975,10 @@ def test_encrypted_string_path_fspath(_vault_secrets_context, expression: str, e expression_locals = dict( temp_file=temp_file, temp_dir=temp_dir, - ed=make_encrypted_string(str(temp_dir)), - edn=make_encrypted_string(temp_dir.name), - ef=make_encrypted_string(str(temp_file)), - efn=make_encrypted_string(temp_file.name), + ed=_vault_secrets_context.make_encrypted_string(str(temp_dir)), + edn=_vault_secrets_context.make_encrypted_string(temp_dir.name), + ef=_vault_secrets_context.make_encrypted_string(str(temp_file)), + efn=_vault_secrets_context.make_encrypted_string(temp_file.name), ) expected = eval(expected_expression, globals(), expression_locals) @@ -1001,9 +987,9 @@ def test_encrypted_string_path_fspath(_vault_secrets_context, expression: str, e assert result == expected -def test_protocol_conformance(_vault_secrets_context) -> None: +def test_protocol_conformance(_vault_secrets_context: VaultTestHelper) -> None: """Verify that the `_EncryptedStringProtocol` defined by the collection loader is implemented.""" - assert isinstance(make_encrypted_string("hey"), _EncryptedStringProtocol) + assert isinstance(_vault_secrets_context.make_encrypted_string("hey"), _EncryptedStringProtocol) def test_encrypted_string_cannot_be_trusted() -> None: diff --git a/test/units/parsing/yaml/test_vault.py b/test/units/parsing/yaml/test_vault.py index e8d826ca80b..03df4b7734e 100644 --- a/test/units/parsing/yaml/test_vault.py +++ b/test/units/parsing/yaml/test_vault.py @@ -10,12 +10,12 @@ from ansible.parsing.vault import VaultHelper, EncryptedString from ansible._internal._yaml._errors import AnsibleYAMLParserError from ansible.parsing.utils.yaml import from_yaml -from ..vault.test_vault import make_vault_ciphertext +from ...mock.vault_helper import VaultTestHelper -def test_from_yaml_json_only(_vault_secrets_context) -> None: +def test_from_yaml_json_only(_vault_secrets_context: VaultTestHelper) -> None: """Ensure that from_yaml properly yields an `EncryptedString` instance for legacy-profile JSON with encoded vaulted values.""" - ciphertext = make_vault_ciphertext('mom') + ciphertext = _vault_secrets_context.make_vault_ciphertext('mom') data = json.dumps(dict(hi=dict( __ansible_vault=ciphertext, @@ -28,8 +28,8 @@ def test_from_yaml_json_only(_vault_secrets_context) -> None: assert VaultHelper.get_ciphertext(result['hi'], with_tags=False) == ciphertext -def test_from_yaml(_vault_secrets_context) -> None: - ciphertext = make_vault_ciphertext('mom') +def test_from_yaml(_vault_secrets_context: VaultTestHelper) -> None: + ciphertext = _vault_secrets_context.make_vault_ciphertext('mom') data = f'hi: !vault |\n{textwrap.indent(ciphertext, " ")}'