ansible-test - Code cleanup. (#75774)

* ansible-test - Remove unused code.
* ansible-test - More PEP 484 type hints.
pull/75780/head
Matt Clay 4 years ago committed by GitHub
parent 6c133da45e
commit 58b03be417
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -69,13 +69,8 @@ def get_hosts(inventory, group_name): # type: (t.Dict[str, t.Any], str) -> t.Di
return hosts return hosts
def ansible_environment(args, color=True, ansible_config=None): def ansible_environment(args, color=True, ansible_config=None): # type: (CommonConfig, bool, t.Optional[str]) -> t.Dict[str, str]
""" """Return a dictionary of environment variables to use when running Ansible commands."""
:type args: CommonConfig
:type color: bool
:type ansible_config: str | None
:rtype: dict[str, str]
"""
env = common_environment() env = common_environment()
path = env['PATH'] path = env['PATH']

@ -8,11 +8,8 @@ from ..data import (
) )
def resolve_csharp_ps_util(import_name, path): def resolve_csharp_ps_util(import_name, path): # type: (str, str) -> str
""" """Return the fully qualified name of the given import if possible, otherwise return the original import name."""
:type import_name: str
:type path: str
"""
if data_context().content.is_ansible or not import_name.startswith('.'): if data_context().content.is_ansible or not import_name.startswith('.'):
# We don't support relative paths for builtin utils, there's no point. # We don't support relative paths for builtin utils, there's no point.
return import_name return import_name

@ -21,14 +21,13 @@ from ..data import (
data_context, data_context,
) )
from ..target import (
TestTarget,
)
def get_csharp_module_utils_imports(powershell_targets, csharp_targets):
"""Return a dictionary of module_utils names mapped to sets of powershell file paths.
:type powershell_targets: list[TestTarget] - C# files
:type csharp_targets: list[TestTarget] - PS files
:rtype: dict[str, set[str]]
"""
def get_csharp_module_utils_imports(powershell_targets, csharp_targets): # type: (t.List[TestTarget], t.List[TestTarget]) -> t.Dict[str, t.Set[str]]
"""Return a dictionary of module_utils names mapped to sets of powershell file paths."""
module_utils = enumerate_module_utils() module_utils = enumerate_module_utils()
imports_by_target_path = {} imports_by_target_path = {}
@ -66,22 +65,15 @@ def get_csharp_module_utils_name(path): # type: (str) -> str
return name return name
def enumerate_module_utils(): def enumerate_module_utils(): # type: () -> t.Set[str]
"""Return a list of available module_utils imports. """Return a set of available module_utils imports."""
:rtype: set[str]
"""
return set(get_csharp_module_utils_name(p) return set(get_csharp_module_utils_name(p)
for p in data_context().content.walk_files(data_context().content.module_utils_csharp_path) for p in data_context().content.walk_files(data_context().content.module_utils_csharp_path)
if os.path.splitext(p)[1] == '.cs') if os.path.splitext(p)[1] == '.cs')
def extract_csharp_module_utils_imports(path, module_utils, is_pure_csharp): def extract_csharp_module_utils_imports(path, module_utils, is_pure_csharp): # type: (str, t.Set[str], bool) -> t.Set[str]
"""Return a list of module_utils imports found in the specified source file. """Return a set of module_utils imports found in the specified source file."""
:type path: str
:type module_utils: set[str]
:type is_pure_csharp: bool
:rtype: set[str]
"""
imports = set() imports = set()
if is_pure_csharp: if is_pure_csharp:
pattern = re.compile(r'(?i)^using\s((?:Ansible|AnsibleCollections)\..+);$') pattern = re.compile(r'(?i)^using\s((?:Ansible|AnsibleCollections)\..+);$')

@ -21,13 +21,13 @@ from ..data import (
data_context, data_context,
) )
from ..target import (
TestTarget,
)
def get_powershell_module_utils_imports(powershell_targets):
"""Return a dictionary of module_utils names mapped to sets of powershell file paths.
:type powershell_targets: list[TestTarget]
:rtype: dict[str, set[str]]
"""
def get_powershell_module_utils_imports(powershell_targets): # type: (t.List[TestTarget]) -> t.Dict[str, t.Set[str]]
"""Return a dictionary of module_utils names mapped to sets of powershell file paths."""
module_utils = enumerate_module_utils() module_utils = enumerate_module_utils()
imports_by_target_path = {} imports_by_target_path = {}
@ -62,21 +62,15 @@ def get_powershell_module_utils_name(path): # type: (str) -> str
return name return name
def enumerate_module_utils(): def enumerate_module_utils(): # type: () -> t.Set[str]
"""Return a list of available module_utils imports. """Return a set of available module_utils imports."""
:rtype: set[str]
"""
return set(get_powershell_module_utils_name(p) return set(get_powershell_module_utils_name(p)
for p in data_context().content.walk_files(data_context().content.module_utils_powershell_path) for p in data_context().content.walk_files(data_context().content.module_utils_powershell_path)
if os.path.splitext(p)[1] == '.psm1') if os.path.splitext(p)[1] == '.psm1')
def extract_powershell_module_utils_imports(path, module_utils): def extract_powershell_module_utils_imports(path, module_utils): # type: (str, t.Set[str]) -> t.Set[str]
"""Return a list of module_utils imports found in the specified source file. """Return a set of module_utils imports found in the specified source file."""
:type path: str
:type module_utils: set[str]
:rtype: set[str]
"""
imports = set() imports = set()
code = read_text_file(path) code = read_text_file(path)

@ -20,17 +20,17 @@ from ..data import (
data_context, data_context,
) )
from ..target import (
TestTarget,
)
VIRTUAL_PACKAGES = { VIRTUAL_PACKAGES = {
'ansible.module_utils.six', 'ansible.module_utils.six',
} }
def get_python_module_utils_imports(compile_targets): def get_python_module_utils_imports(compile_targets): # type: (t.List[TestTarget]) -> t.Dict[str, t.Set[str]]
"""Return a dictionary of module_utils names mapped to sets of python file paths. """Return a dictionary of module_utils names mapped to sets of python file paths."""
:type compile_targets: list[TestTarget]
:rtype: dict[str, set[str]]
"""
module_utils = enumerate_module_utils() module_utils = enumerate_module_utils()
virtual_utils = set(m for m in module_utils if any(m.startswith('%s.' % v) for v in VIRTUAL_PACKAGES)) virtual_utils = set(m for m in module_utils if any(m.startswith('%s.' % v) for v in VIRTUAL_PACKAGES))
@ -163,12 +163,8 @@ def enumerate_module_utils():
return set(module_utils) return set(module_utils)
def extract_python_module_utils_imports(path, module_utils): def extract_python_module_utils_imports(path, module_utils): # type: (str, t.Set[str]) -> t.Set[str]
"""Return a list of module_utils imports found in the specified source file. """Return a list of module_utils imports found in the specified source file."""
:type path: str
:type module_utils: set[str]
:rtype: set[str]
"""
# Python code must be read as bytes to avoid a SyntaxError when the source uses comments to declare the file encoding. # Python code must be read as bytes to avoid a SyntaxError when the source uses comments to declare the file encoding.
# See: https://www.python.org/dev/peps/pep-0263 # See: https://www.python.org/dev/peps/pep-0263
# Specifically: If a Unicode string with a coding declaration is passed to compile(), a SyntaxError will be raised. # Specifically: If a Unicode string with a coding declaration is passed to compile(), a SyntaxError will be raised.
@ -237,11 +233,7 @@ def relative_to_absolute(name, level, module, path, lineno): # type: (str, int,
class ModuleUtilFinder(ast.NodeVisitor): class ModuleUtilFinder(ast.NodeVisitor):
"""AST visitor to find valid module_utils imports.""" """AST visitor to find valid module_utils imports."""
def __init__(self, path, module_utils): def __init__(self, path, module_utils): # type: (str, t.Set[str]) -> None
"""Return a list of module_utils imports found in the specified source file.
:type path: str
:type module_utils: set[str]
"""
self.path = path self.path = path
self.module_utils = module_utils self.module_utils = module_utils
self.imports = set() self.imports = set()
@ -287,10 +279,8 @@ class ModuleUtilFinder(ast.NodeVisitor):
# noinspection PyPep8Naming # noinspection PyPep8Naming
# pylint: disable=locally-disabled, invalid-name # pylint: disable=locally-disabled, invalid-name
def visit_Import(self, node): def visit_Import(self, node): # type: (ast.Import) -> None
""" """Visit an import node."""
:type node: ast.Import
"""
self.generic_visit(node) self.generic_visit(node)
# import ansible.module_utils.MODULE[.MODULE] # import ansible.module_utils.MODULE[.MODULE]
@ -299,10 +289,8 @@ class ModuleUtilFinder(ast.NodeVisitor):
# noinspection PyPep8Naming # noinspection PyPep8Naming
# pylint: disable=locally-disabled, invalid-name # pylint: disable=locally-disabled, invalid-name
def visit_ImportFrom(self, node): def visit_ImportFrom(self, node): # type: (ast.ImportFrom) -> None
""" """Visit an import from node."""
:type node: ast.ImportFrom
"""
self.generic_visit(node) self.generic_visit(node)
if not node.module: if not node.module:
@ -319,11 +307,8 @@ class ModuleUtilFinder(ast.NodeVisitor):
# from ansible_collections.{ns}.{col}.plugins.module_utils.MODULE[.MODULE] import MODULE[, MODULE] # from ansible_collections.{ns}.{col}.plugins.module_utils.MODULE[.MODULE] import MODULE[, MODULE]
self.add_imports(['%s.%s' % (module, alias.name) for alias in node.names], node.lineno) self.add_imports(['%s.%s' % (module, alias.name) for alias in node.names], node.lineno)
def add_import(self, name, line_number): def add_import(self, name, line_number): # type: (str, int) -> None
""" """Record the specified import."""
:type name: str
:type line_number: int
"""
import_name = name import_name = name
while self.is_module_util_name(name): while self.is_module_util_name(name):

@ -61,11 +61,8 @@ from . import (
) )
def command_coverage_combine(args): def command_coverage_combine(args): # type: (CoverageCombineConfig) -> None
"""Patch paths in coverage files and merge into a single file. """Patch paths in coverage files and merge into a single file."""
:type args: CoverageCombineConfig
:rtype: list[str]
"""
host_state = prepare_profiles(args) # coverage combine host_state = prepare_profiles(args) # coverage combine
combine_coverage_files(args, host_state) combine_coverage_files(args, host_state)
@ -184,11 +181,8 @@ def _command_coverage_combine_python(args, host_state): # type: (CoverageCombin
return sorted(output_files) return sorted(output_files)
def _command_coverage_combine_powershell(args): def _command_coverage_combine_powershell(args): # type: (CoverageCombineConfig) -> t.List[str]
""" """Combine PowerShell coverage files and return a list of the output files."""
:type args: CoverageCombineConfig
:rtype: list[str]
"""
coverage_files = get_powershell_coverage_files() coverage_files = get_powershell_coverage_files()
def _default_stub_value(source_paths): def _default_stub_value(source_paths):
@ -265,12 +259,8 @@ def _command_coverage_combine_powershell(args):
return sorted(output_files) return sorted(output_files)
def _get_coverage_targets(args, walk_func): def _get_coverage_targets(args, walk_func): # type: (CoverageCombineConfig, t.Callable) -> t.List[t.Tuple[str, int]]
""" """Return a list of files to cover and the number of lines in each file, using the given function as the source of the files."""
:type args: CoverageCombineConfig
:type walk_func: Func
:rtype: list[tuple[str, int]]
"""
sources = [] sources = []
if args.all or args.stub: if args.all or args.stub:
@ -321,12 +311,8 @@ def _build_stub_groups(args, sources, default_stub_value):
return groups return groups
def get_coverage_group(args, coverage_file): def get_coverage_group(args, coverage_file): # type: (CoverageCombineConfig, str) -> t.Optional[str]
""" """Return the name of the coverage group for the specified coverage file, or None if no group was found."""
:type args: CoverageCombineConfig
:type coverage_file: str
:rtype: str
"""
parts = os.path.basename(coverage_file).split('=', 4) parts = os.path.basename(coverage_file).split('=', 4)
# noinspection PyTypeChecker # noinspection PyTypeChecker

@ -29,10 +29,8 @@ from . import (
) )
def command_coverage_html(args): def command_coverage_html(args): # type: (CoverageHtmlConfig) -> None
""" """Generate an HTML coverage report."""
:type args: CoverageHtmlConfig
"""
host_state = prepare_profiles(args) # coverage html host_state = prepare_profiles(args) # coverage html
output_files = combine_coverage_files(args, host_state) output_files = combine_coverage_files(args, host_state)

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
import typing as t
from ...io import ( from ...io import (
read_json_file, read_json_file,
@ -29,10 +30,8 @@ from . import (
) )
def command_coverage_report(args): def command_coverage_report(args): # type: (CoverageReportConfig) -> None
""" """Generate a console coverage report."""
:type args: CoverageReportConfig
"""
host_state = prepare_profiles(args) # coverage report host_state = prepare_profiles(args) # coverage report
output_files = combine_coverage_files(args, host_state) output_files = combine_coverage_files(args, host_state)
@ -57,12 +56,8 @@ def command_coverage_report(args):
run_coverage(args, host_state, output_file, 'report', options) run_coverage(args, host_state, output_file, 'report', options)
def _generate_powershell_output_report(args, coverage_file): def _generate_powershell_output_report(args, coverage_file): # type: (CoverageReportConfig, str) -> str
""" """Generate and return a PowerShell coverage report for the given coverage file."""
:type args: CoverageReportConfig
:type coverage_file: str
:rtype: str
"""
coverage_info = read_json_file(coverage_file) coverage_info = read_json_file(coverage_file)
root_path = data_context().content.root + '/' root_path = data_context().content.root + '/'
@ -149,10 +144,7 @@ def _generate_powershell_output_report(args, coverage_file):
class CoverageReportConfig(CoverageCombineConfig): class CoverageReportConfig(CoverageCombineConfig):
"""Configuration for the coverage report command.""" """Configuration for the coverage report command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super().__init__(args) super().__init__(args)
self.show_missing = args.show_missing # type: bool self.show_missing = args.show_missing # type: bool

@ -3,6 +3,7 @@ from __future__ import annotations
import os import os
import time import time
import typing as t
from xml.etree.ElementTree import ( from xml.etree.ElementTree import (
Comment, Comment,
@ -47,10 +48,8 @@ from . import (
) )
def command_coverage_xml(args): def command_coverage_xml(args): # type: (CoverageXmlConfig) -> None
""" """Generate an XML coverage report."""
:type args: CoverageXmlConfig
"""
host_state = prepare_profiles(args) # coverage xml host_state = prepare_profiles(args) # coverage xml
output_files = combine_coverage_files(args, host_state) output_files = combine_coverage_files(args, host_state)
@ -70,11 +69,8 @@ def command_coverage_xml(args):
run_coverage(args, host_state, output_file, 'xml', ['-i', '-o', xml_path]) run_coverage(args, host_state, output_file, 'xml', ['-i', '-o', xml_path])
def _generate_powershell_xml(coverage_file): def _generate_powershell_xml(coverage_file): # type: (str) -> Element
""" """Generate a PowerShell coverage report XML element from the specified coverage file and return it."""
:type coverage_file: str
:rtype: Element
"""
coverage_info = read_json_file(coverage_file) coverage_info = read_json_file(coverage_file)
content_root = data_context().content.root content_root = data_context().content.root
@ -135,13 +131,8 @@ def _generate_powershell_xml(coverage_file):
return elem_coverage return elem_coverage
def _add_cobertura_package(packages, package_name, package_data): def _add_cobertura_package(packages, package_name, package_data): # type: (SubElement, str, t.Dict[str, t.Dict[str, int]]) -> t.Tuple[int, int]
""" """Add a package element to the given packages element."""
:type packages: SubElement
:type package_name: str
:type package_data: Dict[str, Dict[str, int]]
:rtype: Tuple[int, int]
"""
elem_package = SubElement(packages, 'package') elem_package = SubElement(packages, 'package')
elem_classes = SubElement(elem_package, 'classes') elem_classes = SubElement(elem_package, 'classes')

@ -57,7 +57,6 @@ from ...host_configs import (
class PylintTest(SanitySingleVersion): class PylintTest(SanitySingleVersion):
"""Sanity test using pylint.""" """Sanity test using pylint."""
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.optional_error_codes.update([ self.optional_error_codes.update([
@ -98,26 +97,16 @@ class PylintTest(SanitySingleVersion):
contexts = [] contexts = []
remaining_paths = set(paths) remaining_paths = set(paths)
def add_context(available_paths, context_name, context_filter): def add_context(available_paths, context_name, context_filter): # type: (t.Set[str], str, t.Callable[[str], bool]) -> None
""" """Add the specified context to the context list, consuming available paths that match the given context filter."""
:type available_paths: set[str]
:type context_name: str
:type context_filter: (str) -> bool
"""
filtered_paths = set(p for p in available_paths if context_filter(p)) filtered_paths = set(p for p in available_paths if context_filter(p))
contexts.append((context_name, sorted(filtered_paths))) contexts.append((context_name, sorted(filtered_paths)))
available_paths -= filtered_paths available_paths -= filtered_paths
def filter_path(path_filter=None): def filter_path(path_filter=None): # type: (str) -> t.Callable[[str], bool]
""" """Return a function that filters out paths which are not a subdirectory of the given path."""
:type path_filter: str def context_filter(path_to_filter): # type: (str) -> bool
:rtype: (str) -> bool """Return true if the given path matches, otherwise return False."""
"""
def context_filter(path_to_filter):
"""
:type path_to_filter: str
:rtype: bool
"""
return is_subdir(path_to_filter, path_filter) return is_subdir(path_to_filter, path_filter)
return context_filter return context_filter

@ -92,10 +92,8 @@ class TestContext:
module_utils = 'module_utils' module_utils = 'module_utils'
def command_units(args): def command_units(args): # type: (UnitsConfig) -> None
""" """Run unit tests."""
:type args: UnitsConfig
"""
handle_layout_messages(data_context().content.unit_messages) handle_layout_messages(data_context().content.unit_messages)
changes = get_changes_filter(args) changes = get_changes_filter(args)

@ -194,11 +194,7 @@ class EnvironmentConfig(CommonConfig):
class TestConfig(EnvironmentConfig): class TestConfig(EnvironmentConfig):
"""Configuration common to all test commands.""" """Configuration common to all test commands."""
def __init__(self, args, command): def __init__(self, args, command): # type: (t.Any, str) -> None
"""
:type args: any
:type command: str
"""
super().__init__(args, command) super().__init__(args, command)
self.coverage = args.coverage # type: bool self.coverage = args.coverage # type: bool
@ -239,10 +235,7 @@ class TestConfig(EnvironmentConfig):
class ShellConfig(EnvironmentConfig): class ShellConfig(EnvironmentConfig):
"""Configuration for the shell command.""" """Configuration for the shell command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super().__init__(args, 'shell') super().__init__(args, 'shell')
self.raw = args.raw # type: bool self.raw = args.raw # type: bool
@ -250,10 +243,7 @@ class ShellConfig(EnvironmentConfig):
class SanityConfig(TestConfig): class SanityConfig(TestConfig):
"""Configuration for the sanity command.""" """Configuration for the sanity command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super().__init__(args, 'sanity') super().__init__(args, 'sanity')
self.test = args.test # type: t.List[str] self.test = args.test # type: t.List[str]
@ -278,11 +268,7 @@ class SanityConfig(TestConfig):
class IntegrationConfig(TestConfig): class IntegrationConfig(TestConfig):
"""Configuration for the integration command.""" """Configuration for the integration command."""
def __init__(self, args, command): def __init__(self, args, command): # type: (t.Any, str) -> None
"""
:type args: any
:type command: str
"""
super().__init__(args, command) super().__init__(args, command)
self.start_at = args.start_at # type: str self.start_at = args.start_at # type: str
@ -326,28 +312,19 @@ TIntegrationConfig = t.TypeVar('TIntegrationConfig', bound=IntegrationConfig)
class PosixIntegrationConfig(IntegrationConfig): class PosixIntegrationConfig(IntegrationConfig):
"""Configuration for the posix integration command.""" """Configuration for the posix integration command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super().__init__(args, 'integration') super().__init__(args, 'integration')
class WindowsIntegrationConfig(IntegrationConfig): class WindowsIntegrationConfig(IntegrationConfig):
"""Configuration for the windows integration command.""" """Configuration for the windows integration command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super().__init__(args, 'windows-integration') super().__init__(args, 'windows-integration')
class NetworkIntegrationConfig(IntegrationConfig): class NetworkIntegrationConfig(IntegrationConfig):
"""Configuration for the network integration command.""" """Configuration for the network integration command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super().__init__(args, 'network-integration') super().__init__(args, 'network-integration')
self.testcase = args.testcase # type: str self.testcase = args.testcase # type: str
@ -355,10 +332,7 @@ class NetworkIntegrationConfig(IntegrationConfig):
class UnitsConfig(TestConfig): class UnitsConfig(TestConfig):
"""Configuration for the units command.""" """Configuration for the units command."""
def __init__(self, args): def __init__(self, args): # type: (t.Any) -> None
"""
:type args: any
"""
super().__init__(args, 'units') super().__init__(args, 'units')
self.collect_only = args.collect_only # type: bool self.collect_only = args.collect_only # type: bool

@ -176,10 +176,11 @@ def is_docker_user_defined_network(network): # type: (str) -> bool
return network and network != 'bridge' return network and network != 'bridge'
def docker_pull(args, image): def docker_pull(args, image): # type: (EnvironmentConfig, str) -> None
""" """
:type args: EnvironmentConfig Pull the specified image if it is not available.
:type image: str Images without a tag or digest will not be pulled.
Retries up to 10 times if the pull fails.
""" """
if '@' not in image and ':' not in image: if '@' not in image and ':' not in image:
display.info('Skipping pull of image without tag or digest: %s' % image, verbosity=2) display.info('Skipping pull of image without tag or digest: %s' % image, verbosity=2)
@ -265,11 +266,8 @@ def docker_start(args, container_id, options=None): # type: (EnvironmentConfig,
raise ApplicationError('Failed to run docker container "%s".' % container_id) raise ApplicationError('Failed to run docker container "%s".' % container_id)
def docker_rm(args, container_id): def docker_rm(args, container_id): # type: (EnvironmentConfig, str) -> None
""" """Remove the specified container."""
:type args: EnvironmentConfig
:type container_id: str
"""
try: try:
docker_command(args, ['rm', '-f', container_id], capture=True) docker_command(args, ['rm', '-f', container_id], capture=True)
except SubprocessError as ex: except SubprocessError as ex:

@ -26,10 +26,8 @@ class Metadata:
self.change_description = None # type: t.Optional[ChangeDescription] self.change_description = None # type: t.Optional[ChangeDescription]
self.ci_provider = None # type: t.Optional[str] self.ci_provider = None # type: t.Optional[str]
def populate_changes(self, diff): def populate_changes(self, diff): # type: (t.Optional[t.List[str]]) -> None
""" """Populate the changeset using the given diff."""
:type diff: list[str] | None
"""
patches = parse_diff(diff) patches = parse_diff(diff)
patches = sorted(patches, key=lambda k: k.new.path) # type: t.List[FileDiff] patches = sorted(patches, key=lambda k: k.new.path) # type: t.List[FileDiff]
@ -47,10 +45,8 @@ class Metadata:
# failed tests involving deleted files should be using line 0 since there is no content remaining # failed tests involving deleted files should be using line 0 since there is no content remaining
self.changes[path] = ((0, 0),) self.changes[path] = ((0, 0),)
def to_dict(self): def to_dict(self): # type: () -> t.Dict[str, t.Any]
""" """Return a dictionary representation of the metadata."""
:rtype: dict[str, any]
"""
return dict( return dict(
changes=self.changes, changes=self.changes,
cloud_config=self.cloud_config, cloud_config=self.cloud_config,
@ -58,10 +54,8 @@ class Metadata:
change_description=self.change_description.to_dict(), change_description=self.change_description.to_dict(),
) )
def to_file(self, path): def to_file(self, path): # type: (str) -> None
""" """Write the metadata to the specified file."""
:type path: path
"""
data = self.to_dict() data = self.to_dict()
display.info('>>> Metadata: %s\n%s' % (path, data), verbosity=3) display.info('>>> Metadata: %s\n%s' % (path, data), verbosity=3)
@ -69,20 +63,14 @@ class Metadata:
write_json_file(path, data) write_json_file(path, data)
@staticmethod @staticmethod
def from_file(path): def from_file(path): # type: (str) -> Metadata
""" """Return metadata loaded from the specified file."""
:type path: str
:rtype: Metadata
"""
data = read_json_file(path) data = read_json_file(path)
return Metadata.from_dict(data) return Metadata.from_dict(data)
@staticmethod @staticmethod
def from_dict(data): def from_dict(data): # type: (t.Dict[str, t.Any]) -> Metadata
""" """Return metadata loaded from the specified dictionary."""
:type data: dict[str, any]
:rtype: Metadata
"""
metadata = Metadata() metadata = Metadata()
metadata.changes = data['changes'] metadata.changes = data['changes']
metadata.cloud_config = data['cloud_config'] metadata.cloud_config = data['cloud_config']
@ -103,23 +91,17 @@ class ChangeDescription:
self.no_integration_paths = [] # type: t.List[str] self.no_integration_paths = [] # type: t.List[str]
@property @property
def targets(self): def targets(self): # type: () -> t.Optional[t.List[str]]
""" """Optional list of target names."""
:rtype: list[str] | None
"""
return self.regular_command_targets.get(self.command) return self.regular_command_targets.get(self.command)
@property @property
def focused_targets(self): def focused_targets(self): # type: () -> t.Optional[t.List[str]]
""" """Optional list of focused target names."""
:rtype: list[str] | None
"""
return self.focused_command_targets.get(self.command) return self.focused_command_targets.get(self.command)
def to_dict(self): def to_dict(self): # type: () -> t.Dict[str, t.Any]
""" """Return a dictionary representation of the change description."""
:rtype: dict[str, any]
"""
return dict( return dict(
command=self.command, command=self.command,
changed_paths=self.changed_paths, changed_paths=self.changed_paths,
@ -130,11 +112,8 @@ class ChangeDescription:
) )
@staticmethod @staticmethod
def from_dict(data): def from_dict(data): # type: (t.Dict[str, t.Any]) -> ChangeDescription
""" """Return a change description loaded from the given dictionary."""
:param data: dict[str, any]
:rtype: ChangeDescription
"""
changes = ChangeDescription() changes = ChangeDescription()
changes.command = data['command'] changes.command = data['command']
changes.changed_paths = data['changed_paths'] changes.changed_paths = data['changed_paths']

@ -13,10 +13,7 @@ TCallable = t.TypeVar('TCallable', bound=t.Callable)
class WrappedThread(threading.Thread): class WrappedThread(threading.Thread):
"""Wrapper around Thread which captures results and exceptions.""" """Wrapper around Thread which captures results and exceptions."""
def __init__(self, action): def __init__(self, action): # type: (t.Callable[[], t.Any]) -> None
"""
:type action: () -> any
"""
super().__init__() super().__init__()
self._result = queue.Queue() self._result = queue.Queue()
self.action = action self.action = action

@ -3,7 +3,6 @@ from __future__ import annotations
import errno import errno
import fcntl import fcntl
import hashlib
import inspect import inspect
import os import os
import pkgutil import pkgutil
@ -31,7 +30,6 @@ from .encoding import (
from .io import ( from .io import (
open_binary_file, open_binary_file,
read_binary_file,
read_text_file, read_text_file,
) )
@ -163,13 +161,11 @@ def exclude_none_values(data): # type: (t.Dict[TKey, t.Optional[TValue]]) -> t.
return dict((key, value) for key, value in data.items() if value is not None) return dict((key, value) for key, value in data.items() if value is not None)
def find_executable(executable, cwd=None, path=None, required=True): def find_executable(executable, cwd=None, path=None, required=True): # type: (str, t.Optional[str], t.Optional[str], t.Union[bool, str]) -> t.Optional[str]
""" """
:type executable: str Find the specified executable and return the full path, or None if it could not be found.
:type cwd: str If required is True an exception will be raised if the executable is not found.
:type path: str If required is set to 'warning' then a warning will be shown if the executable is not found.
:type required: bool | str
:rtype: str | None
""" """
match = None match = None
real_cwd = os.getcwd() real_cwd = os.getcwd()
@ -215,12 +211,11 @@ def find_executable(executable, cwd=None, path=None, required=True):
return match return match
def find_python(version, path=None, required=True): def find_python(version, path=None, required=True): # type: (str, t.Optional[str], bool) -> t.Optional[str]
""" """
:type version: str Find and return the full path to the specified Python version.
:type path: str | None If required, an exception will be raised not found.
:type required: bool If not required, None will be returned if not found.
:rtype: str
""" """
version_info = str_to_version(version) version_info = str_to_version(version)
@ -415,10 +410,8 @@ def pass_vars(required, optional):
return env return env
def remove_tree(path): def remove_tree(path): # type: (str) -> None
""" """Remove the specified directory, siliently continuing if the directory does not exist."""
:type path: str
"""
try: try:
shutil.rmtree(to_bytes(path)) shutil.rmtree(to_bytes(path))
except OSError as ex: except OSError as ex:
@ -426,11 +419,8 @@ def remove_tree(path):
raise raise
def is_binary_file(path): def is_binary_file(path): # type: (str) -> bool
""" """Return True if the specified file is a binary file, otherwise return False."""
:type path: str
:rtype: bool
"""
assume_text = { assume_text = {
'.cfg', '.cfg',
'.conf', '.conf',
@ -491,10 +481,8 @@ def generate_name(length=8): # type: (int) -> str
return ''.join(random.choice(string.ascii_letters + string.digits) for _idx in range(length)) return ''.join(random.choice(string.ascii_letters + string.digits) for _idx in range(length))
def generate_password(): def generate_password(): # type: () -> str
"""Generate a random password. """Generate and return random password."""
:rtype: str
"""
chars = [ chars = [
string.ascii_letters, string.ascii_letters,
string.digits, string.digits,
@ -542,13 +530,11 @@ class Display:
if os.isatty(0): if os.isatty(0):
self.rows, self.columns = unpack('HHHH', fcntl.ioctl(0, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[:2] self.rows, self.columns = unpack('HHHH', fcntl.ioctl(0, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[:2]
def __warning(self, message): def __warning(self, message): # type: (str) -> None
""" """Internal implementation for displaying a warning message."""
:type message: str
"""
self.print_message('WARNING: %s' % message, color=self.purple, fd=sys.stderr) self.print_message('WARNING: %s' % message, color=self.purple, fd=sys.stderr)
def review_warnings(self): def review_warnings(self): # type: () -> None
"""Review all warnings which previously occurred.""" """Review all warnings which previously occurred."""
if not self.warnings: if not self.warnings:
return return
@ -558,12 +544,8 @@ class Display:
for warning in self.warnings: for warning in self.warnings:
self.__warning(warning) self.__warning(warning)
def warning(self, message, unique=False, verbosity=0): def warning(self, message, unique=False, verbosity=0): # type: (str, bool, int) -> None
""" """Display a warning level message."""
:type message: str
:type unique: bool
:type verbosity: int
"""
if verbosity > self.verbosity: if verbosity > self.verbosity:
return return
@ -576,24 +558,16 @@ class Display:
self.__warning(message) self.__warning(message)
self.warnings.append(message) self.warnings.append(message)
def notice(self, message): def notice(self, message): # type: (str) -> None
""" """Display a notice level message."""
:type message: str
"""
self.print_message('NOTICE: %s' % message, color=self.purple, fd=sys.stderr) self.print_message('NOTICE: %s' % message, color=self.purple, fd=sys.stderr)
def error(self, message): def error(self, message): # type: (str) -> None
""" """Display an error level message."""
:type message: str
"""
self.print_message('ERROR: %s' % message, color=self.red, fd=sys.stderr) self.print_message('ERROR: %s' % message, color=self.red, fd=sys.stderr)
def info(self, message, verbosity=0, truncate=False): def info(self, message, verbosity=0, truncate=False): # type: (str, int, bool) -> None
""" """Display an info level message."""
:type message: str
:type verbosity: int
:type truncate: bool
"""
if self.verbosity >= verbosity: if self.verbosity >= verbosity:
color = self.verbosity_colors.get(verbosity, self.yellow) color = self.verbosity_colors.get(verbosity, self.yellow)
self.print_message(message, color=color, fd=sys.stderr if self.info_stderr else sys.stdout, truncate=truncate) self.print_message(message, color=color, fd=sys.stderr if self.info_stderr else sys.stdout, truncate=truncate)
@ -674,10 +648,7 @@ class SubprocessError(ApplicationError):
class MissingEnvironmentVariable(ApplicationError): class MissingEnvironmentVariable(ApplicationError):
"""Error caused by missing environment variable.""" """Error caused by missing environment variable."""
def __init__(self, name): def __init__(self, name): # type: (str) -> None
"""
:type name: str
"""
super().__init__('Missing environment variable: %s' % name) super().__init__('Missing environment variable: %s' % name)
self.name = name self.name = name
@ -694,12 +665,8 @@ def retry(func, ex_type=SubprocessError, sleep=10, attempts=10):
return func() return func()
def parse_to_list_of_dict(pattern, value): def parse_to_list_of_dict(pattern, value): # type: (str, str) -> t.List[t.Dict[str, str]]
""" """Parse lines from the given value using the specified pattern and return the extracted list of key/value pair dictionaries."""
:type pattern: str
:type value: str
:return: list[dict[str, str]]
"""
matched = [] matched = []
unmatched = [] unmatched = []
@ -833,21 +800,6 @@ def sanitize_host_name(name):
return re.sub('[^A-Za-z0-9]+', '-', name)[:63].strip('-') return re.sub('[^A-Za-z0-9]+', '-', name)[:63].strip('-')
def get_hash(path):
"""
:type path: str
:rtype: str | None
"""
if not os.path.exists(path):
return None
file_hash = hashlib.sha256()
file_hash.update(read_binary_file(path))
return file_hash.hexdigest()
@cache @cache
def get_host_ip(): def get_host_ip():
"""Return the host's IP address.""" """Return the host's IP address."""

@ -123,11 +123,7 @@ ResultType._populate() # pylint: disable=protected-access
class CommonConfig: class CommonConfig:
"""Configuration common to all commands.""" """Configuration common to all commands."""
def __init__(self, args, command): def __init__(self, args, command): # type: (t.Any, str) -> None
"""
:type args: any
:type command: str
"""
self.command = command self.command = command
self.success = None # type: t.Optional[bool] self.success = None # type: t.Optional[bool]

Loading…
Cancel
Save