add type hints for DataLoader methods (#81359)

* add type hints for DataLoader methods

* fix list of tuples

* remove mention of pre-2.4 vault method

* Add None return type

Use | instead of typing.Optional or typing.Union
pull/81395/head
Sloane Hertel 1 year ago committed by GitHub
parent 081c60b9d3
commit 2e4a501549
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -11,6 +11,7 @@ import os
import os.path
import re
import tempfile
import typing as t
from ansible import constants as C
from ansible.errors import AnsibleFileNotFound, AnsibleParserError
@ -19,7 +20,7 @@ from ansible.module_utils.six import binary_type, text_type
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.parsing.quoting import unquote
from ansible.parsing.utils.yaml import from_yaml
from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope
from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope, PromptVaultSecret
from ansible.utils.path import unfrackpath
from ansible.utils.display import Display
@ -45,7 +46,7 @@ class DataLoader:
Usage:
dl = DataLoader()
# optionally: dl.set_vault_password('foo')
# optionally: dl.set_vault_secrets([('default', ansible.parsing.vault.PrompVaultSecret(...),)])
ds = dl.load('...')
ds = dl.load_from_file('/path/to/file')
'''
@ -66,20 +67,19 @@ class DataLoader:
# initialize the vault stuff with an empty password
# TODO: replace with a ref to something that can get the password
# a creds/auth provider
# self.set_vault_password(None)
self._vaults = {}
self._vault = VaultLib()
self.set_vault_secrets(None)
# TODO: since we can query vault_secrets late, we could provide this to DataLoader init
def set_vault_secrets(self, vault_secrets):
def set_vault_secrets(self, vault_secrets: list[tuple[str, PromptVaultSecret]] | None) -> None:
self._vault.secrets = vault_secrets
def load(self, data, file_name='<string>', show_content=True, json_only=False):
def load(self, data: str, file_name: str = '<string>', show_content: bool = True, json_only: bool = False) -> t.Any:
'''Backwards compat for now'''
return from_yaml(data, file_name, show_content, self._vault.secrets, json_only=json_only)
def load_from_file(self, file_name, cache=True, unsafe=False, json_only=False):
def load_from_file(self, file_name: str, cache: bool = True, unsafe: bool = False, json_only: bool = False) -> t.Any:
''' Loads data from a file, which can contain either JSON or YAML. '''
file_name = self.path_dwim(file_name)
@ -105,28 +105,28 @@ class DataLoader:
# return a deep copy here, so the cache is not affected
return copy.deepcopy(parsed_data)
def path_exists(self, path):
def path_exists(self, path: str) -> bool:
path = self.path_dwim(path)
return os.path.exists(to_bytes(path, errors='surrogate_or_strict'))
def is_file(self, path):
def is_file(self, path: str) -> bool:
path = self.path_dwim(path)
return os.path.isfile(to_bytes(path, errors='surrogate_or_strict')) or path == os.devnull
def is_directory(self, path):
def is_directory(self, path: str) -> bool:
path = self.path_dwim(path)
return os.path.isdir(to_bytes(path, errors='surrogate_or_strict'))
def list_directory(self, path):
def list_directory(self, path: str) -> list[str]:
path = self.path_dwim(path)
return os.listdir(path)
def is_executable(self, path):
def is_executable(self, path: str) -> bool:
'''is the given path executable?'''
path = self.path_dwim(path)
return is_executable(path)
def _decrypt_if_vault_data(self, b_vault_data, b_file_name=None):
def _decrypt_if_vault_data(self, b_vault_data: bytes, b_file_name: bytes | None = None) -> tuple[bytes, bool]:
'''Decrypt b_vault_data if encrypted and return b_data and the show_content flag'''
if not is_encrypted(b_vault_data):
@ -139,7 +139,7 @@ class DataLoader:
show_content = False
return b_data, show_content
def _get_file_contents(self, file_name):
def _get_file_contents(self, file_name: str) -> tuple[bytes, bool]:
'''
Reads the file contents from the given file name
@ -168,17 +168,17 @@ class DataLoader:
except (IOError, OSError) as e:
raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, to_native(e)), orig_exc=e)
def get_basedir(self):
def get_basedir(self) -> str:
''' returns the current basedir '''
return self._basedir
def set_basedir(self, basedir):
def set_basedir(self, basedir: str) -> None:
''' sets the base directory, used to find files when a relative path is given '''
if basedir is not None:
self._basedir = to_text(basedir)
def path_dwim(self, given):
def path_dwim(self, given: str) -> str:
'''
make relative paths work like folks expect.
'''
@ -194,7 +194,7 @@ class DataLoader:
return unfrackpath(path, follow=False)
def _is_role(self, path):
def _is_role(self, path: str) -> bool:
''' imperfect role detection, roles are still valid w/o tasks|meta/main.yml|yaml|etc '''
b_path = to_bytes(path, errors='surrogate_or_strict')
@ -228,7 +228,7 @@ class DataLoader:
return False
def path_dwim_relative(self, path, dirname, source, is_role=False):
def path_dwim_relative(self, path: str, dirname: str, source: str, is_role: bool = False) -> str:
'''
find one file in either a role or playbook dir with or without
explicitly named dirname subdirs
@ -283,7 +283,7 @@ class DataLoader:
return candidate
def path_dwim_relative_stack(self, paths, dirname, source, is_role=False):
def path_dwim_relative_stack(self, paths: list[str], dirname: str, source: str, is_role: bool = False) -> str:
'''
find one file in first path in stack taking roles into account and adding play basedir as fallback
@ -342,7 +342,7 @@ class DataLoader:
return result
def _create_content_tempfile(self, content):
def _create_content_tempfile(self, content: str | bytes) -> str:
''' Create a tempfile containing defined content '''
fd, content_tempfile = tempfile.mkstemp(dir=C.DEFAULT_LOCAL_TMP)
f = os.fdopen(fd, 'wb')
@ -356,7 +356,7 @@ class DataLoader:
f.close()
return content_tempfile
def get_real_file(self, file_path, decrypt=True):
def get_real_file(self, file_path: str, decrypt: bool = True) -> str:
"""
If the file is vault encrypted return a path to a temporary decrypted file
If the file is not encrypted then the path is returned
@ -396,7 +396,7 @@ class DataLoader:
except (IOError, OSError) as e:
raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (to_native(real_path), to_native(e)), orig_exc=e)
def cleanup_tmp_file(self, file_path):
def cleanup_tmp_file(self, file_path: str) -> None:
"""
Removes any temporary files created from a previous call to
get_real_file. file_path must be the path returned from a
@ -406,7 +406,7 @@ class DataLoader:
os.unlink(file_path)
self._tempfiles.remove(file_path)
def cleanup_all_tmp_files(self):
def cleanup_all_tmp_files(self) -> None:
"""
Removes all temporary files that DataLoader has created
NOTE: not thread safe, forks also need special handling see __init__ for details.
@ -417,7 +417,7 @@ class DataLoader:
except Exception as e:
display.warning("Unable to cleanup temp files: %s" % to_text(e))
def find_vars_files(self, path, name, extensions=None, allow_dir=True):
def find_vars_files(self, path: str, name: str, extensions: list[str] | None = None, allow_dir: bool = True) -> list[str]:
"""
Find vars files in a given path with specified name. This will find
files in a dir named <name>/ or a file called <name> ending in known
@ -447,11 +447,11 @@ class DataLoader:
else:
continue
else:
found.append(full_path)
found.append(to_text(full_path))
break
return found
def _get_dir_vars_files(self, path, extensions):
def _get_dir_vars_files(self, path: str, extensions: list[str]) -> list[str]:
found = []
for spath in sorted(self.list_directory(path)):
if not spath.startswith(u'.') and not spath.endswith(u'~'): # skip hidden and backups

Loading…
Cancel
Save