From 2e4a501549f9c0813c258b39dafa8bda0102910a Mon Sep 17 00:00:00 2001 From: Sloane Hertel <19572925+s-hertel@users.noreply.github.com> Date: Tue, 1 Aug 2023 11:20:20 -0400 Subject: [PATCH] add type hints for DataLoader methods (#81359) * add type hints for DataLoader methods * fix list of tuples * remove mention of pre-2.4 vault method * Add None return type Use | instead of typing.Optional or typing.Union --- lib/ansible/parsing/dataloader.py | 52 +++++++++++++++---------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/lib/ansible/parsing/dataloader.py b/lib/ansible/parsing/dataloader.py index ad0b77316a4..13a57e41973 100644 --- a/lib/ansible/parsing/dataloader.py +++ b/lib/ansible/parsing/dataloader.py @@ -11,6 +11,7 @@ import os import os.path import re import tempfile +import typing as t from ansible import constants as C from ansible.errors import AnsibleFileNotFound, AnsibleParserError @@ -19,7 +20,7 @@ from ansible.module_utils.six import binary_type, text_type from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text from ansible.parsing.quoting import unquote from ansible.parsing.utils.yaml import from_yaml -from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope +from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope, PromptVaultSecret from ansible.utils.path import unfrackpath from ansible.utils.display import Display @@ -45,7 +46,7 @@ class DataLoader: Usage: dl = DataLoader() - # optionally: dl.set_vault_password('foo') + # optionally: dl.set_vault_secrets([('default', ansible.parsing.vault.PrompVaultSecret(...),)]) ds = dl.load('...') ds = dl.load_from_file('/path/to/file') ''' @@ -66,20 +67,19 @@ class DataLoader: # initialize the vault stuff with an empty password # TODO: replace with a ref to something that can get the password # a creds/auth provider - # self.set_vault_password(None) self._vaults = {} self._vault = VaultLib() self.set_vault_secrets(None) # TODO: since we can query vault_secrets late, we could provide this to DataLoader init - def set_vault_secrets(self, vault_secrets): + def set_vault_secrets(self, vault_secrets: list[tuple[str, PromptVaultSecret]] | None) -> None: self._vault.secrets = vault_secrets - def load(self, data, file_name='', show_content=True, json_only=False): + def load(self, data: str, file_name: str = '', show_content: bool = True, json_only: bool = False) -> t.Any: '''Backwards compat for now''' return from_yaml(data, file_name, show_content, self._vault.secrets, json_only=json_only) - def load_from_file(self, file_name, cache=True, unsafe=False, json_only=False): + def load_from_file(self, file_name: str, cache: bool = True, unsafe: bool = False, json_only: bool = False) -> t.Any: ''' Loads data from a file, which can contain either JSON or YAML. ''' file_name = self.path_dwim(file_name) @@ -105,28 +105,28 @@ class DataLoader: # return a deep copy here, so the cache is not affected return copy.deepcopy(parsed_data) - def path_exists(self, path): + def path_exists(self, path: str) -> bool: path = self.path_dwim(path) return os.path.exists(to_bytes(path, errors='surrogate_or_strict')) - def is_file(self, path): + def is_file(self, path: str) -> bool: path = self.path_dwim(path) return os.path.isfile(to_bytes(path, errors='surrogate_or_strict')) or path == os.devnull - def is_directory(self, path): + def is_directory(self, path: str) -> bool: path = self.path_dwim(path) return os.path.isdir(to_bytes(path, errors='surrogate_or_strict')) - def list_directory(self, path): + def list_directory(self, path: str) -> list[str]: path = self.path_dwim(path) return os.listdir(path) - def is_executable(self, path): + def is_executable(self, path: str) -> bool: '''is the given path executable?''' path = self.path_dwim(path) return is_executable(path) - def _decrypt_if_vault_data(self, b_vault_data, b_file_name=None): + def _decrypt_if_vault_data(self, b_vault_data: bytes, b_file_name: bytes | None = None) -> tuple[bytes, bool]: '''Decrypt b_vault_data if encrypted and return b_data and the show_content flag''' if not is_encrypted(b_vault_data): @@ -139,7 +139,7 @@ class DataLoader: show_content = False return b_data, show_content - def _get_file_contents(self, file_name): + def _get_file_contents(self, file_name: str) -> tuple[bytes, bool]: ''' Reads the file contents from the given file name @@ -168,17 +168,17 @@ class DataLoader: except (IOError, OSError) as e: raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, to_native(e)), orig_exc=e) - def get_basedir(self): + def get_basedir(self) -> str: ''' returns the current basedir ''' return self._basedir - def set_basedir(self, basedir): + def set_basedir(self, basedir: str) -> None: ''' sets the base directory, used to find files when a relative path is given ''' if basedir is not None: self._basedir = to_text(basedir) - def path_dwim(self, given): + def path_dwim(self, given: str) -> str: ''' make relative paths work like folks expect. ''' @@ -194,7 +194,7 @@ class DataLoader: return unfrackpath(path, follow=False) - def _is_role(self, path): + def _is_role(self, path: str) -> bool: ''' imperfect role detection, roles are still valid w/o tasks|meta/main.yml|yaml|etc ''' b_path = to_bytes(path, errors='surrogate_or_strict') @@ -228,7 +228,7 @@ class DataLoader: return False - def path_dwim_relative(self, path, dirname, source, is_role=False): + def path_dwim_relative(self, path: str, dirname: str, source: str, is_role: bool = False) -> str: ''' find one file in either a role or playbook dir with or without explicitly named dirname subdirs @@ -283,7 +283,7 @@ class DataLoader: return candidate - def path_dwim_relative_stack(self, paths, dirname, source, is_role=False): + def path_dwim_relative_stack(self, paths: list[str], dirname: str, source: str, is_role: bool = False) -> str: ''' find one file in first path in stack taking roles into account and adding play basedir as fallback @@ -342,7 +342,7 @@ class DataLoader: return result - def _create_content_tempfile(self, content): + def _create_content_tempfile(self, content: str | bytes) -> str: ''' Create a tempfile containing defined content ''' fd, content_tempfile = tempfile.mkstemp(dir=C.DEFAULT_LOCAL_TMP) f = os.fdopen(fd, 'wb') @@ -356,7 +356,7 @@ class DataLoader: f.close() return content_tempfile - def get_real_file(self, file_path, decrypt=True): + def get_real_file(self, file_path: str, decrypt: bool = True) -> str: """ If the file is vault encrypted return a path to a temporary decrypted file If the file is not encrypted then the path is returned @@ -396,7 +396,7 @@ class DataLoader: except (IOError, OSError) as e: raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (to_native(real_path), to_native(e)), orig_exc=e) - def cleanup_tmp_file(self, file_path): + def cleanup_tmp_file(self, file_path: str) -> None: """ Removes any temporary files created from a previous call to get_real_file. file_path must be the path returned from a @@ -406,7 +406,7 @@ class DataLoader: os.unlink(file_path) self._tempfiles.remove(file_path) - def cleanup_all_tmp_files(self): + def cleanup_all_tmp_files(self) -> None: """ Removes all temporary files that DataLoader has created NOTE: not thread safe, forks also need special handling see __init__ for details. @@ -417,7 +417,7 @@ class DataLoader: except Exception as e: display.warning("Unable to cleanup temp files: %s" % to_text(e)) - def find_vars_files(self, path, name, extensions=None, allow_dir=True): + def find_vars_files(self, path: str, name: str, extensions: list[str] | None = None, allow_dir: bool = True) -> list[str]: """ Find vars files in a given path with specified name. This will find files in a dir named / or a file called ending in known @@ -447,11 +447,11 @@ class DataLoader: else: continue else: - found.append(full_path) + found.append(to_text(full_path)) break return found - def _get_dir_vars_files(self, path, extensions): + def _get_dir_vars_files(self, path: str, extensions: list[str]) -> list[str]: found = [] for spath in sorted(self.list_directory(path)): if not spath.startswith(u'.') and not spath.endswith(u'~'): # skip hidden and backups