Encryptedstring redact fixes (#85390)

* misc DTFIX/docstring cleanup

* fix EncryptedString redaction, add tests

Co-authored-by: Matt Clay <matt@mystile.com>

* Fix test failures

---------

Co-authored-by: Matt Clay <matt@mystile.com>
(cherry picked from commit 649c9ec443)
pull/85403/head
Matt Davis 6 months ago committed by Matt Clay
parent 8c083e4d1d
commit ebae950db2

@ -147,9 +147,20 @@ class AnsibleVariableVisitor:
"""Internal implementation to recursively visit a data structure's contents."""
self._current = key # supports StateTrackingMixIn
value_type = type(value)
value_type: type = type(value)
if self.apply_transforms and value_type in _transform._type_transform_mapping:
# handle EncryptedString conversion before more generic transformation and native conversions
if value_type is EncryptedString: # pylint: disable=unidiomatic-typecheck
match self.encrypted_string_behavior:
case EncryptedStringBehavior.DECRYPT:
value = str(value) # type: ignore[assignment]
value_type = str
case EncryptedStringBehavior.REDACT:
value = "<redacted>" # type: ignore[assignment]
value_type = str
case EncryptedStringBehavior.FAIL:
raise AnsibleVariableTypeError.from_value(obj=value)
elif self.apply_transforms and value_type in _transform._type_transform_mapping:
value = self._template_engine.transform(value)
value_type = type(value)
@ -162,8 +173,6 @@ class AnsibleVariableVisitor:
# DTFIX-FUTURE: Visitor generally ignores dict/mapping keys by default except for debugging and schema-aware checking.
# It could be checking keys destined for variable storage to apply more strict rules about key shape and type.
# DTFIX0: some type lists being consulted (the ones from datatag) are probably too permissive, and perhaps should not be dynamic
if (result := self._early_visit(value, value_type)) is not _sentinel:
pass
# DTFIX7: de-duplicate and optimize; extract inline generator expressions and fallback function or mapping for native type calculation?
@ -173,14 +182,6 @@ class AnsibleVariableVisitor:
elif value_type in _ANSIBLE_ALLOWED_NON_SCALAR_COLLECTION_VAR_TYPES:
with self: # supports StateTrackingMixIn
result = AnsibleTagHelper.tag_copy(value, (self._visit(k, v) for k, v in enumerate(t.cast(t.Iterable, value))), value_type=value_type)
elif self.encrypted_string_behavior != EncryptedStringBehavior.FAIL and isinstance(value, EncryptedString):
match self.encrypted_string_behavior:
case EncryptedStringBehavior.REDACT:
result = "<redacted>" # type: ignore[assignment]
case EncryptedStringBehavior.PRESERVE:
result = value # type: ignore[assignment]
case EncryptedStringBehavior.DECRYPT:
result = str(value) # type: ignore[assignment]
elif self.convert_mapping_to_dict and _internal.is_intermediate_mapping(value):
with self: # supports StateTrackingMixIn
result = {self._visit_key(k): self._visit(k, v) for k, v in value.items()} # type: ignore[assignment]
@ -193,12 +194,13 @@ class AnsibleVariableVisitor:
result = float(value) # type: ignore[assignment]
elif self.convert_custom_scalars and isinstance(value, int) and not isinstance(value, bool):
result = int(value) # type: ignore[assignment]
else:
if value_type not in _ANSIBLE_ALLOWED_VAR_TYPES:
raise AnsibleVariableTypeError.from_value(obj=value)
elif value_type in _ANSIBLE_ALLOWED_VAR_TYPES:
# supported scalar type that requires no special handling, just return as-is
result = value
elif self.encrypted_string_behavior is EncryptedStringBehavior.PRESERVE and isinstance(value, EncryptedString):
result = value # type: ignore[assignment]
else:
raise AnsibleVariableTypeError.from_value(obj=value)
if self.origin and not Origin.is_tagged_on(result):
# apply shared instance default origin tag

@ -99,9 +99,10 @@ Omit = object.__new__(_OmitType)
_datatag._untaggable_types.add(_OmitType)
# DTFIX0: review these type sets to ensure they're not overly permissive/dynamic
IGNORE_SCALAR_VAR_TYPES = {value for value in _datatag._ANSIBLE_ALLOWED_SCALAR_VAR_TYPES if not issubclass(value, str)}
"""Scalar variable types that short-circuit bypass templating."""
PASS_THROUGH_SCALAR_VAR_TYPES = _datatag._ANSIBLE_ALLOWED_SCALAR_VAR_TYPES | {
_OmitType, # allow pass through of omit for later handling after top-level finalize completes
}
"""Scalar variable types which are allowed to appear in finalized template results."""

@ -949,8 +949,13 @@ This set gets augmented with additional types when some controller-only types ar
# noinspection PyProtectedMember
_ANSIBLE_ALLOWED_VAR_TYPES = frozenset({type(None), bool}) | set(AnsibleTaggedObject._tagged_type_map) | set(AnsibleTaggedObject._tagged_type_map.values())
"""These are the only types supported by Ansible's variable storage. Subclasses are not permitted."""
"""These are the exact types supported by Ansible's variable storage."""
_ANSIBLE_ALLOWED_NON_SCALAR_COLLECTION_VAR_TYPES = frozenset(item for item in _ANSIBLE_ALLOWED_VAR_TYPES if is_non_scalar_collection_type(item))
"""These are the exact non-scalar collection types supported by Ansible's variable storage."""
_ANSIBLE_ALLOWED_MAPPING_VAR_TYPES = frozenset(item for item in _ANSIBLE_ALLOWED_VAR_TYPES if issubclass(item, c.Mapping))
"""These are the exact mapping types supported by Ansible's variable storage."""
_ANSIBLE_ALLOWED_SCALAR_VAR_TYPES = _ANSIBLE_ALLOWED_VAR_TYPES - _ANSIBLE_ALLOWED_NON_SCALAR_COLLECTION_VAR_TYPES
"""These are the exact scalar types supported by Ansible's variable storage."""

@ -293,7 +293,7 @@ def validate_variable_name(name: object) -> None:
def transform_to_native_types(
value: object,
redact: bool = True,
) -> object:
) -> t.Any:
"""
Recursively transform the given value to Python native types.
Potentially sensitive values such as individually vaulted variables will be redacted unless ``redact=False`` is passed.

@ -10,8 +10,6 @@ from ansible.plugins.callback import CallbackBase
class CallbackModule(CallbackBase):
# DTFIX1: validate VaultedValue redaction behavior
CALLBACK_NEEDS_ENABLED = True
seen_tr = [] # track taskresult instances to ensure every call sees a unique instance

@ -0,0 +1,51 @@
from __future__ import annotations
import typing as t
import pytest
from ansible._internal._json import AnsibleVariableVisitor, EncryptedStringBehavior
from ansible.errors import AnsibleVariableTypeError
from ansible.parsing.vault import EncryptedString, AnsibleVaultError
from units.mock.vault_helper import VaultTestHelper
@pytest.mark.parametrize("behavior, decryptable, expected", (
(EncryptedStringBehavior.PRESERVE, True, None),
(EncryptedStringBehavior.PRESERVE, False, None),
(EncryptedStringBehavior.DECRYPT, True, "plaintext"),
(EncryptedStringBehavior.DECRYPT, False, AnsibleVaultError("no vault secrets")),
(EncryptedStringBehavior.REDACT, True, "<redacted>"),
(EncryptedStringBehavior.REDACT, False, "<redacted>"),
(EncryptedStringBehavior.FAIL, True, AnsibleVariableTypeError("unsupported for variable storage")),
(EncryptedStringBehavior.FAIL, False, AnsibleVariableTypeError("unsupported for variable storage")),
), ids=str)
def test_encrypted_string_behavior(
behavior: EncryptedStringBehavior,
decryptable: bool,
expected: t.Any,
_vault_secrets_context: None,
) -> None:
if decryptable:
value = VaultTestHelper.make_encrypted_string('plaintext')
else:
# valid ciphertext with intentionally unavailable secret
value = EncryptedString(ciphertext=(
'$ANSIBLE_VAULT;1.1;AES256\n'
'333665623864636331356364306535613231613833616662656130613665336561316435393736366636663864396636326330626530643238653462333562350a396162623230643'
'037396430383335386663363534353733386430643764303062633738613533336135653563313139373038333964316264633265376435370a326137363231646261303036356636'
'37346430303361316436306130663461393832656134346639326365633830373361376236343961386164323538353962'
))
avv = AnsibleVariableVisitor(encrypted_string_behavior=behavior)
if isinstance(expected, Exception):
with pytest.raises(type(expected), match=expected.args[0]):
avv.visit(value)
else:
result = avv.visit(value)
if expected is None:
assert result is value
else:
assert result == expected

@ -26,7 +26,9 @@ import unittest
from ansible.errors import AnsibleError
from ansible._internal._datatag._tags import Origin
from ansible.parsing.vault import EncryptedString
from ansible.utils.vars import combine_vars, merge_hash, transform_to_native_types
from units.mock.vault_helper import VaultTestHelper
class TestVariableUtils(unittest.TestCase):
@ -279,12 +281,27 @@ class TestVariableUtils(unittest.TestCase):
def test_transform_to_native_types() -> None:
"""Verify that transform_to_native_types results in native types for both keys and values."""
value = {Origin(description="blah").tag("tagged_key"): Origin(description="blah").tag("value with tagged key")}
"""Verify that transform_to_native_types results in native types for both keys and values, with default redaction."""
value = {
Origin(description="blah").tag("tagged_key"): Origin(description="blah").tag("value with tagged key"),
# use a bogus EncryptedString instance with no VaultSecretsContext active; ensures that transform with redaction does not attempt decryption
"redact_this": EncryptedString(ciphertext="bogus")
}
result = transform_to_native_types(value)
assert result == dict(tagged_key="value with tagged key")
assert result == dict(tagged_key="value with tagged key", redact_this='<redacted>')
assert all(type(key) is str for key in result.keys()) # pylint: disable=unidiomatic-typecheck
assert all(type(value) is str for value in result.values()) # pylint: disable=unidiomatic-typecheck
def test_transform_to_native_types_unredacted(_vault_secrets_context: None) -> None:
"""Verify that transform with redaction disabled returns a plain string decrypted value."""
plaintext = "hello"
value = dict(enc=VaultTestHelper.make_encrypted_string(plaintext))
result = transform_to_native_types(value, redact=False)
assert result == dict(enc=plaintext)
assert all(type(key) is str for key in result.keys()) # pylint: disable=unidiomatic-typecheck
assert all(type(value) is str for value in result.values()) # pylint: disable=unidiomatic-typecheck

Loading…
Cancel
Save