Update DataLoader to deal almost exclusively in str (#85941)

* Update DataLoader to deal almost exclusively in str

* source can be None as well

Co-authored-by: David Shrewsbury <Shrews@users.noreply.github.com>
pull/85939/head
sivel / Matt Martz 1 month ago committed by GitHub
parent 719681bbe2
commit f1f5b934c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,2 @@
minor_changes:
- DataLoader - Update ``DataLoader`` to deal exclusively in str

@ -18,7 +18,7 @@ from ansible._internal._errors import _error_utils
from ansible.module_utils.basic import is_executable
from ansible._internal._datatag._tags import Origin, TrustedAsTemplate, SourceWasEncrypted
from ansible.module_utils._internal._datatag import AnsibleTagHelper
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.parsing.quoting import unquote
from ansible.parsing.utils.yaml import from_yaml
from ansible.parsing.vault import VaultLib, is_encrypted, is_encrypted_file, PromptVaultSecret
@ -133,15 +133,15 @@ class DataLoader:
def path_exists(self, path: str) -> bool:
path = self.path_dwim(path)
return os.path.exists(to_bytes(path, errors='surrogate_or_strict'))
return os.path.exists(path)
def is_file(self, path: str) -> bool:
path = self.path_dwim(path)
return os.path.isfile(to_bytes(path, errors='surrogate_or_strict')) or path == os.devnull
return os.path.isfile(path) or path == os.devnull
def is_directory(self, path: str) -> bool:
path = self.path_dwim(path)
return os.path.isdir(to_bytes(path, errors='surrogate_or_strict'))
return os.path.isdir(path)
def list_directory(self, path: str) -> list[str]:
path = self.path_dwim(path)
@ -246,28 +246,27 @@ class DataLoader:
def _is_role(self, path: str) -> bool:
""" imperfect role detection, roles are still valid w/o tasks|meta/main.yml|yaml|etc """
b_path = to_bytes(path, errors='surrogate_or_strict')
b_path_dirname = os.path.dirname(b_path)
b_upath = to_bytes(unfrackpath(path, follow=False), errors='surrogate_or_strict')
path_dirname = os.path.dirname(path)
upath = unfrackpath(path, follow=False)
untasked_paths = (
os.path.join(b_path, b'main.yml'),
os.path.join(b_path, b'main.yaml'),
os.path.join(b_path, b'main'),
os.path.join(path, 'main.yml'),
os.path.join(path, 'main.yaml'),
os.path.join(path, 'main'),
)
tasked_paths = (
os.path.join(b_upath, b'tasks/main.yml'),
os.path.join(b_upath, b'tasks/main.yaml'),
os.path.join(b_upath, b'tasks/main'),
os.path.join(b_upath, b'meta/main.yml'),
os.path.join(b_upath, b'meta/main.yaml'),
os.path.join(b_upath, b'meta/main'),
os.path.join(b_path_dirname, b'tasks/main.yml'),
os.path.join(b_path_dirname, b'tasks/main.yaml'),
os.path.join(b_path_dirname, b'tasks/main'),
os.path.join(b_path_dirname, b'meta/main.yml'),
os.path.join(b_path_dirname, b'meta/main.yaml'),
os.path.join(b_path_dirname, b'meta/main'),
os.path.join(upath, 'tasks/main.yml'),
os.path.join(upath, 'tasks/main.yaml'),
os.path.join(upath, 'tasks/main'),
os.path.join(upath, 'meta/main.yml'),
os.path.join(upath, 'meta/main.yaml'),
os.path.join(upath, 'meta/main'),
os.path.join(path_dirname, 'tasks/main.yml'),
os.path.join(path_dirname, 'tasks/main.yaml'),
os.path.join(path_dirname, 'tasks/main'),
os.path.join(path_dirname, 'meta/main.yml'),
os.path.join(path_dirname, 'meta/main.yaml'),
os.path.join(path_dirname, 'meta/main'),
)
exists_untasked = map(os.path.exists, untasked_paths)
@ -326,12 +325,12 @@ class DataLoader:
search.append(self.path_dwim(source))
for candidate in search:
if os.path.exists(to_bytes(candidate, errors='surrogate_or_strict')):
if os.path.exists(candidate):
break
return candidate
def path_dwim_relative_stack(self, paths: list[str], dirname: str, source: str, is_role: bool = False) -> str:
def path_dwim_relative_stack(self, paths: list[str], dirname: str | None, source: str | None, is_role: bool = False) -> str:
"""
find one file in first path in stack taking roles into account and adding play basedir as fallback
@ -343,64 +342,62 @@ class DataLoader:
:returns: An absolute path to the filename ``source`` if found
:raises: An AnsibleFileNotFound Exception if the file is found to exist in the search paths
"""
b_dirname = to_bytes(dirname, errors='surrogate_or_strict')
b_source = to_bytes(source, errors='surrogate_or_strict')
source_root = None
if source:
source_root = source.split('/')[0]
basedir = self.get_basedir()
result = None
search = []
if source is None:
display.warning('Invalid request to find a file that matches a "null" value')
elif source and (source.startswith('~') or source.startswith(os.path.sep)):
# path is absolute, no relative needed, check existence and return source
test_path = unfrackpath(b_source, follow=False)
if os.path.exists(to_bytes(test_path, errors='surrogate_or_strict')):
test_path = unfrackpath(source, follow=False)
if os.path.exists(test_path):
result = test_path
else:
display.debug('evaluation_path:\n\t%s' % '\n\t'.join(paths))
for path in paths:
upath = unfrackpath(path, follow=False)
b_upath = to_bytes(upath, errors='surrogate_or_strict')
b_pb_base_dir = os.path.dirname(b_upath)
pb_base_dir = os.path.dirname(upath)
# if path is in role and 'tasks' not there already, add it into the search
if (is_role or self._is_role(path)) and b_pb_base_dir.endswith(b'/tasks'):
search.append(os.path.join(os.path.dirname(b_pb_base_dir), b_dirname, b_source))
search.append(os.path.join(b_pb_base_dir, b_source))
if (is_role or self._is_role(path)) and pb_base_dir.endswith('/tasks'):
if dirname:
search.append(os.path.join(os.path.dirname(pb_base_dir), dirname, source))
search.append(os.path.join(pb_base_dir, source))
# don't add dirname if user already is using it in source
if b_source.split(b'/')[0] != dirname:
search.append(os.path.join(b_upath, b_dirname, b_source))
search.append(os.path.join(b_upath, b_source))
if dirname and source_root != dirname:
search.append(os.path.join(upath, dirname, source))
search.append(os.path.join(upath, source))
# always append basedir as last resort
# don't add dirname if user already is using it in source
if b_source.split(b'/')[0] != dirname:
search.append(os.path.join(to_bytes(self.get_basedir(), errors='surrogate_or_strict'), b_dirname, b_source))
search.append(os.path.join(to_bytes(self.get_basedir(), errors='surrogate_or_strict'), b_source))
display.debug('search_path:\n\t%s' % to_text(b'\n\t'.join(search)))
for b_candidate in search:
display.vvvvv('looking for "%s" at "%s"' % (source, to_text(b_candidate)))
if os.path.exists(b_candidate):
result = to_text(b_candidate)
if dirname and source_root != dirname:
search.append(os.path.join(basedir, dirname, source))
search.append(os.path.join(basedir, source))
display.debug('search_path:\n\t%s' % '\n\t'.join(search))
for candidate in search:
display.vvvvv('looking for "%s" at "%s"' % (source, candidate))
if os.path.exists(candidate):
result = candidate
break
if result is None:
raise AnsibleFileNotFound(file_name=source, paths=[to_native(p) for p in search])
raise AnsibleFileNotFound(file_name=source, paths=search)
return result
def _create_content_tempfile(self, content: str | bytes) -> str:
""" Create a tempfile containing defined content """
fd, content_tempfile = tempfile.mkstemp(dir=C.DEFAULT_LOCAL_TMP)
f = os.fdopen(fd, 'wb')
content = to_bytes(content)
try:
f.write(content)
except Exception as err:
os.remove(content_tempfile)
raise Exception(err)
finally:
f.close()
with os.fdopen(fd, 'wb') as f:
try:
f.write(to_bytes(content))
except Exception as err:
os.remove(content_tempfile)
raise Exception(err)
return content_tempfile
def get_real_file(self, file_path: str, decrypt: bool = True) -> str:
@ -410,9 +407,6 @@ class DataLoader:
Temporary files are cleanup in the destructor
"""
if not file_path or not isinstance(file_path, (bytes, str)):
raise AnsibleParserError("Invalid filename: '%s'" % to_native(file_path))
if not self.path_exists(file_path) or not self.is_file(file_path):
raise AnsibleFileNotFound(file_name=file_path)
@ -420,7 +414,7 @@ class DataLoader:
try:
if decrypt:
with open(to_bytes(real_path), 'rb') as f:
with open(real_path, 'rb') as f:
# Limit how much of the file is read since we do not know
# whether this is a vault file and therefore it could be very
# large.
@ -430,7 +424,7 @@ class DataLoader:
# since the decrypt function doesn't know the file name
data = Origin(path=real_path).tag(f.read())
if not self._vault.secrets:
raise AnsibleParserError("A vault password or secret must be specified to decrypt %s" % to_native(file_path))
raise AnsibleParserError("A vault password or secret must be specified to decrypt %s" % file_path)
data = self._vault.decrypt(data)
# Make a temp file
@ -440,7 +434,7 @@ class DataLoader:
return real_path
except OSError as ex:
raise AnsibleParserError(f"an error occurred while trying to read the file {to_text(real_path)!r}.") from ex
raise AnsibleParserError(f"an error occurred while trying to read the file {real_path!r}.") from ex
def cleanup_tmp_file(self, file_path: str) -> None:
"""
@ -470,7 +464,7 @@ class DataLoader:
extensions.
"""
b_path = to_bytes(os.path.join(path, name))
jpath = os.path.join(path, name)
found: list[str] = []
if extensions is None:
@ -480,13 +474,11 @@ class DataLoader:
for ext in extensions:
if '.' in ext:
b_full_path = b_path + to_bytes(ext)
full_path = jpath + ext
elif ext:
b_full_path = b'.'.join([b_path, to_bytes(ext)])
full_path = '.'.join([jpath, ext])
else:
b_full_path = b_path
full_path = to_text(b_full_path)
full_path = jpath
if self.path_exists(full_path):
if self.is_directory(full_path):
@ -509,7 +501,7 @@ class DataLoader:
if self.is_directory(full_spath) and not ext: # recursive search if dir
found.extend(self._get_dir_vars_files(full_spath, extensions))
elif self.is_file(full_spath) and (not ext or to_text(ext) in extensions):
elif self.is_file(full_spath) and (not ext or ext in extensions):
# only consider files with valid extensions or no extension
found.append(full_spath)

@ -532,7 +532,7 @@ class ActionBase(ABC, _AnsiblePluginInfoMixin):
else:
self._connection._shell.tmpdir = None
def _transfer_file(self, local_path, remote_path):
def _transfer_file(self, local_path: str, remote_path: str) -> str:
"""
Copy a file from the controller to a remote path
@ -550,7 +550,7 @@ class ActionBase(ABC, _AnsiblePluginInfoMixin):
self._connection.put_file(local_path, remote_path)
return remote_path
def _transfer_data(self, remote_path: str | bytes, data: str | bytes) -> str | bytes:
def _transfer_data(self, remote_path: str, data: str | bytes) -> str:
"""
Copies the module data out to the temporary module path.
"""
@ -1317,7 +1317,7 @@ class ActionBase(ABC, _AnsiblePluginInfoMixin):
# Change directory to basedir of task for command execution when connection is local
if self._connection.transport == 'local':
self._connection.cwd = to_bytes(self._loader.get_basedir(), errors='surrogate_or_strict')
self._connection.cwd = self._loader.get_basedir()
rc, stdout, stderr = self._connection.exec_command(cmd, in_data=in_data, sudoable=sudoable)

@ -26,7 +26,7 @@ from ansible.module_utils.common.text.converters import to_bytes, to_text
__all__ = ['unfrackpath', 'makedirs_safe']
def unfrackpath(path, follow=True, basedir=None):
def unfrackpath(path: str, follow: bool = True, basedir: str | None = None) -> str:
"""
Returns a path that is free of symlinks (if follow=True), environment variables, relative path traversals and symbols (~)
@ -45,22 +45,20 @@ def unfrackpath(path, follow=True, basedir=None):
'$HOME/../../var/mail' becomes '/var/spool/mail'
"""
b_basedir = to_bytes(basedir, errors='surrogate_or_strict', nonstring='passthru')
if basedir is None:
basedir = os.getcwd()
elif os.path.isfile(basedir):
basedir = os.path.dirname(basedir)
if b_basedir is None:
b_basedir = to_bytes(os.getcwd(), errors='surrogate_or_strict')
elif os.path.isfile(b_basedir):
b_basedir = os.path.dirname(b_basedir)
final_path = os.path.expanduser(os.path.expandvars(path))
b_final_path = os.path.expanduser(os.path.expandvars(to_bytes(path, errors='surrogate_or_strict')))
if not os.path.isabs(b_final_path):
b_final_path = os.path.join(b_basedir, b_final_path)
if not os.path.isabs(final_path):
final_path = os.path.join(basedir, final_path)
if follow:
b_final_path = os.path.realpath(b_final_path)
final_path = os.path.realpath(final_path)
return to_text(os.path.normpath(b_final_path), errors='surrogate_or_strict')
return os.path.normpath(final_path)
def makedirs_safe(path, mode=None):

@ -46,7 +46,7 @@ class TestDataLoader(unittest.TestCase):
@patch('os.path.exists')
def test__is_role(self, p_exists):
p_exists.side_effect = lambda p: p == b'test_path/tasks/main.yml'
p_exists.side_effect = lambda p: p == 'test_path/tasks/main.yml'
self.assertTrue(self._loader._is_role('test_path/tasks'))
self.assertTrue(self._loader._is_role('test_path/'))
@ -223,9 +223,6 @@ class TestDataLoaderWithVault(unittest.TestCase):
self._loader.set_vault_secrets(wrong_vault)
self.assertRaises(AnsibleVaultError, self._loader.get_real_file, self.test_vault_data_path)
def test_get_real_file_not_a_path(self):
self.assertRaisesRegex(AnsibleParserError, 'Invalid filename', self._loader.get_real_file, None)
def test_parse_from_vault_1_1_file(self):
vaulted_data = """$ANSIBLE_VAULT;1.1;AES256
33343734386261666161626433386662623039356366656637303939306563376130623138626165

Loading…
Cancel
Save