|
|
|
|
@ -36,7 +36,7 @@ from ansible import errors
|
|
|
|
|
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
|
|
|
|
from ansible.module_utils._internal._datatag import AnsibleTagHelper
|
|
|
|
|
from ansible.parsing import vault
|
|
|
|
|
from ansible.parsing.vault import EncryptedString, VaultSecretsContext, VaultLib, AnsibleVaultError, VaultHelper
|
|
|
|
|
from ansible.parsing.vault import EncryptedString, VaultSecretsContext, AnsibleVaultError, VaultHelper
|
|
|
|
|
from ansible._internal._templating._jinja_common import VaultExceptionMarker, TruncationMarker, Marker
|
|
|
|
|
from ansible._internal._templating._engine import TemplateEngine, TemplateOptions
|
|
|
|
|
from ansible._internal._templating._utils import TemplateContext
|
|
|
|
|
@ -44,7 +44,7 @@ from ansible._internal._datatag._tags import VaultedValue, Origin, TrustedAsTemp
|
|
|
|
|
from ansible.utils.collection_loader import _EncryptedStringProtocol
|
|
|
|
|
|
|
|
|
|
from units.mock.loader import DictDataLoader
|
|
|
|
|
from units.mock.vault_helper import TextVaultSecret
|
|
|
|
|
from units.mock.vault_helper import TextVaultSecret, VaultTestHelper
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestUnhexlify(unittest.TestCase):
|
|
|
|
|
@ -876,28 +876,14 @@ def test_vault_secrets_context_already_initialized(_zap_vault_secrets_context) -
|
|
|
|
|
VaultSecretsContext.initialize(VaultSecretsContext([]))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_vault_ciphertext(plaintext: str) -> str:
|
|
|
|
|
"""Creates an `EncryptedString` from the first secret in the active VaultSecretsContext."""
|
|
|
|
|
secrets = VaultSecretsContext.current().secrets
|
|
|
|
|
vl = VaultLib(secrets)
|
|
|
|
|
|
|
|
|
|
return vl.encrypt(plaintext, secrets[0][1]).decode()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_encrypted_string(plaintext: str) -> EncryptedString:
|
|
|
|
|
"""Creates an `EncryptedString` from the first secret in the active VaultSecretsContext."""
|
|
|
|
|
|
|
|
|
|
return Origin(path="/tmp/sometest", line_num=42, col_num=42).tag(EncryptedString(ciphertext=make_vault_ciphertext(plaintext)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_encrypted_string_unmanaged_access(_vault_secrets_context) -> None:
|
|
|
|
|
def test_encrypted_string_unmanaged_access(_vault_secrets_context: VaultTestHelper) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Validates that unmanaged access to an `EncryptedString`:
|
|
|
|
|
* properly decrypts and caches the value when secrets are available
|
|
|
|
|
* propagates Origin and VaultedValue tags
|
|
|
|
|
"""
|
|
|
|
|
plaintext = 'i am plaintext'
|
|
|
|
|
encrypted_string = make_encrypted_string(plaintext)
|
|
|
|
|
encrypted_string = _vault_secrets_context.make_encrypted_string(plaintext)
|
|
|
|
|
|
|
|
|
|
origin = Origin.get_required_tag(encrypted_string)
|
|
|
|
|
vaulted_value_tag = VaultedValue(ciphertext=VaultHelper.get_ciphertext(encrypted_string, with_tags=False))
|
|
|
|
|
@ -912,9 +898,9 @@ def test_encrypted_string_unmanaged_access(_vault_secrets_context) -> None:
|
|
|
|
|
assert VaultedValue.get_required_tag(res1).ciphertext == vaulted_value_tag.ciphertext
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_encrypted_string_unmanaged_access_fail(_vault_secrets_context) -> None:
|
|
|
|
|
def test_encrypted_string_unmanaged_access_fail(_vault_secrets_context: VaultTestHelper) -> None:
|
|
|
|
|
"""Validates that unmanaged access to an `EncryptedString` fails with AnsibleVaultError when secrets are not available."""
|
|
|
|
|
encrypted_string = make_encrypted_string("i am plaintext")
|
|
|
|
|
encrypted_string = _vault_secrets_context.make_encrypted_string("i am plaintext")
|
|
|
|
|
VaultSecretsContext.current().secrets = []
|
|
|
|
|
|
|
|
|
|
with pytest.raises(AnsibleVaultError):
|
|
|
|
|
@ -927,18 +913,18 @@ def test_encrypted_string_unmanaged_access_fail(_vault_secrets_context) -> None:
|
|
|
|
|
(42, int),
|
|
|
|
|
(42.42, float),
|
|
|
|
|
))
|
|
|
|
|
def test_encrypted_string_conversion_methods(value: t.Any, conversion_func: t.Callable, _vault_secrets_context):
|
|
|
|
|
def test_encrypted_string_conversion_methods(value: t.Any, conversion_func: t.Callable, _vault_secrets_context: VaultTestHelper):
|
|
|
|
|
"""Ensure that `EncryptedString` dunder conversion methods decrypt and pass through correctly."""
|
|
|
|
|
encrypted_string = make_encrypted_string(str(value))
|
|
|
|
|
encrypted_string = _vault_secrets_context.make_encrypted_string(str(value))
|
|
|
|
|
|
|
|
|
|
converted = conversion_func(encrypted_string)
|
|
|
|
|
|
|
|
|
|
assert converted == value
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_radd(_vault_secrets_context) -> None:
|
|
|
|
|
def test_radd(_vault_secrets_context: VaultTestHelper) -> None:
|
|
|
|
|
"""Ensure that the __radd__ dunder method decrypts and passes through."""
|
|
|
|
|
assert "plain string " + make_encrypted_string("secret string") == "plain string secret string"
|
|
|
|
|
assert "plain string " + _vault_secrets_context.make_encrypted_string("secret string") == "plain string secret string"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_marker(marker_type: type[Marker], *args, **kwargs):
|
|
|
|
|
@ -979,7 +965,7 @@ def test_vaulthelper_get_ciphertext(value: t.Any, expected_ciphertext: str | Non
|
|
|
|
|
("os.listdir(ed)", "[temp_file.name]"),
|
|
|
|
|
("open(ef).read()", "'Ansible'"),
|
|
|
|
|
))
|
|
|
|
|
def test_encrypted_string_path_fspath(_vault_secrets_context, expression: str, expected_expression: str) -> None:
|
|
|
|
|
def test_encrypted_string_path_fspath(_vault_secrets_context: VaultTestHelper, expression: str, expected_expression: str) -> None:
|
|
|
|
|
"""Ensure that `EncryptedString` works with `PathLike` duck-typing consumers."""
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir_path:
|
|
|
|
|
temp_dir = pathlib.Path(temp_dir_path)
|
|
|
|
|
@ -989,10 +975,10 @@ def test_encrypted_string_path_fspath(_vault_secrets_context, expression: str, e
|
|
|
|
|
expression_locals = dict(
|
|
|
|
|
temp_file=temp_file,
|
|
|
|
|
temp_dir=temp_dir,
|
|
|
|
|
ed=make_encrypted_string(str(temp_dir)),
|
|
|
|
|
edn=make_encrypted_string(temp_dir.name),
|
|
|
|
|
ef=make_encrypted_string(str(temp_file)),
|
|
|
|
|
efn=make_encrypted_string(temp_file.name),
|
|
|
|
|
ed=_vault_secrets_context.make_encrypted_string(str(temp_dir)),
|
|
|
|
|
edn=_vault_secrets_context.make_encrypted_string(temp_dir.name),
|
|
|
|
|
ef=_vault_secrets_context.make_encrypted_string(str(temp_file)),
|
|
|
|
|
efn=_vault_secrets_context.make_encrypted_string(temp_file.name),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
expected = eval(expected_expression, globals(), expression_locals)
|
|
|
|
|
@ -1001,9 +987,9 @@ def test_encrypted_string_path_fspath(_vault_secrets_context, expression: str, e
|
|
|
|
|
assert result == expected
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_protocol_conformance(_vault_secrets_context) -> None:
|
|
|
|
|
def test_protocol_conformance(_vault_secrets_context: VaultTestHelper) -> None:
|
|
|
|
|
"""Verify that the `_EncryptedStringProtocol` defined by the collection loader is implemented."""
|
|
|
|
|
assert isinstance(make_encrypted_string("hey"), _EncryptedStringProtocol)
|
|
|
|
|
assert isinstance(_vault_secrets_context.make_encrypted_string("hey"), _EncryptedStringProtocol)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_encrypted_string_cannot_be_trusted() -> None:
|
|
|
|
|
|