Ensure config env/ini values are tagged (#85404)

* Ensure config env/ini values are tagged

Config env and ini values now have origin and trust tags applied.

* Remove unused import
pull/85398/head
Matt Clay 5 months ago committed by GitHub
parent d6efb7db8a
commit 6ff6339191
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -6,6 +6,7 @@ from __future__ import annotations
import atexit import atexit
import decimal import decimal
import configparser import configparser
import functools
import os import os
import os.path import os.path
import sys import sys
@ -248,18 +249,6 @@ def get_config_type(cfile):
return ftype return ftype
# FIXME: can move to module_utils for use for ini plugins also?
def get_ini_config_value(p, entry):
""" returns the value of last ini entry found """
value = None
if p is not None:
try:
value = p.get(entry.get('section', 'defaults'), entry.get('key', ''), raw=True)
except Exception: # FIXME: actually report issues here
pass
return value
def find_ini_config_file(warnings=None): def find_ini_config_file(warnings=None):
""" Load INI Config File order(first found is used): ENV, CWD, HOME, /etc/ansible """ """ Load INI Config File order(first found is used): ENV, CWD, HOME, /etc/ansible """
# FIXME: eventually deprecate ini configs # FIXME: eventually deprecate ini configs
@ -345,6 +334,7 @@ class ConfigManager:
_errors: list[tuple[str, Exception]] _errors: list[tuple[str, Exception]]
def __init__(self, conf_file=None, defs_file=None): def __init__(self, conf_file=None, defs_file=None):
self._get_ini_config_value = functools.cache(self._get_ini_config_value)
self._base_defs = {} self._base_defs = {}
self._plugins = {} self._plugins = {}
@ -628,6 +618,7 @@ class ConfigManager:
# env vars are next precedence # env vars are next precedence
if value is None and defs[config].get('env'): if value is None and defs[config].get('env'):
value, origin = self._loop_entries(os.environ, defs[config]['env']) value, origin = self._loop_entries(os.environ, defs[config]['env'])
value = _tags.TrustedAsTemplate().tag(value)
origin = 'env: %s' % origin origin = 'env: %s' % origin
# try config file entries next, if we have one # try config file entries next, if we have one
@ -642,7 +633,7 @@ class ConfigManager:
for entry in defs[config][ftype]: for entry in defs[config][ftype]:
# load from config # load from config
if ftype == 'ini': if ftype == 'ini':
temp_value = get_ini_config_value(self._parsers[cfile], entry) temp_value = self._get_ini_config_value(cfile, entry.get('section', 'defaults'), entry['key'])
elif ftype == 'yaml': elif ftype == 'yaml':
raise AnsibleError('YAML configuration type has not been implemented yet') raise AnsibleError('YAML configuration type has not been implemented yet')
else: else:
@ -724,6 +715,32 @@ class ConfigManager:
self._plugins[plugin_type][name] = defs self._plugins[plugin_type][name] = defs
def _get_ini_config_value(self, config_file: str, section: str, option: str) -> t.Any:
"""
Fetch `option` from the specified `section`.
Returns `None` if the specified `section` or `option` are not present.
Origin and TrustedAsTemplate tags are applied to returned values.
CAUTION: Although INI sourced configuration values are trusted for templating, that does not automatically mean they will be templated.
It is up to the code consuming configuration values to apply templating if required.
"""
parser = self._parsers[config_file]
value = parser.get(section, option, raw=True, fallback=None)
if value is not None:
value = self._apply_tags(value, section, option)
return value
def _apply_tags(self, value: str, section: str, option: str) -> t.Any:
"""Apply origin and trust to the given `value` sourced from the stated `section` and `option`."""
description = f'section {section!r} option {option!r}'
origin = _tags.Origin(path=self._config_file, description=description)
tags = [origin, _tags.TrustedAsTemplate()]
value = AnsibleTagHelper.tag(value, tags)
return value
@staticmethod @staticmethod
def get_deprecated_msg_from_config(dep_docs, include_removal=False, collection_name=None): def get_deprecated_msg_from_config(dep_docs, include_removal=False, collection_name=None):

@ -1,4 +1,3 @@
# deprecated: description='ansible_managed has been removed' core_version='2.23'
- name: invoke template lookup with content using default injected `ansible_managed` - name: invoke template lookup with content using default injected `ansible_managed`
debug: debug:
msg: "{{ lookup('template', 'uses_ansible_managed.j2') }}" msg: "{{ lookup('template', 'uses_ansible_managed.j2') }}"

@ -16,6 +16,7 @@ from ansible.config.manager import ConfigManager, ensure_type, resolve_path, get
from ansible.errors import AnsibleOptionsError, AnsibleError from ansible.errors import AnsibleOptionsError, AnsibleError
from ansible._internal._datatag._tags import Origin, VaultedValue from ansible._internal._datatag._tags import Origin, VaultedValue
from ansible.module_utils._internal._datatag import AnsibleTagHelper from ansible.module_utils._internal._datatag import AnsibleTagHelper
from ansible.template import is_trusted_as_template
from units.mock.vault_helper import VaultTestHelper from units.mock.vault_helper import VaultTestHelper
curdir = os.path.dirname(__file__) curdir = os.path.dirname(__file__)
@ -272,3 +273,30 @@ def test_256color_support(key, expected_value):
actual_value = manager.get_config_value(key) actual_value = manager.get_config_value(key)
# THEN: no error # THEN: no error
assert actual_value == expected_value assert actual_value == expected_value
def test_config_trust_from_env(monkeypatch: pytest.MonkeyPatch) -> None:
expected = "from test"
monkeypatch.setenv("ANSIBLE_TEST_ENTRY", expected)
result = ConfigManager().get_config_value("_Z_TEST_ENTRY")
origin = Origin.get_tag(result)
assert result == expected
assert is_trusted_as_template(result)
assert origin and origin.description == '<Config env: ANSIBLE_TEST_ENTRY>'
def test_config_trust_from_file(tmp_path: pathlib.Path) -> None:
expected = "from test"
cfg_path = tmp_path / 'test.cfg'
cfg_path.write_text(f"[testing]\nvalid={expected}")
result = ConfigManager(str(cfg_path)).get_config_value("_Z_TEST_ENTRY")
origin = Origin.get_tag(result)
assert result == expected
assert is_trusted_as_template(result)
assert origin
assert origin.path == str(cfg_path)
assert origin.description == "section 'testing' option 'valid'"

Loading…
Cancel
Save