ansible-test - Fix various type hinting issues. (#79798)

* ansible-test - Add missing type hints.

* ansible-test - Remove redundant type hints.

* ansible-test - Fix return type annotations.

* ansible-test - Add assert, casts to assist mypy.

* ansible-test - Fix incorrect type hints.

* ansible-test - Remove no-op code.

* ansible-test - Fix incorrect types.

* ansible-test - Fix method signature mismatch.
pull/79801/head
Matt Clay 2 years ago committed by GitHub
parent 5f58015527
commit c9f20aedc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,7 +12,7 @@ from .util import (
class Become(metaclass=abc.ABCMeta): class Become(metaclass=abc.ABCMeta):
"""Base class for become implementations.""" """Base class for become implementations."""
@classmethod @classmethod
def name(cls): def name(cls) -> str:
"""The name of this plugin.""" """The name of this plugin."""
return cls.__name__.lower() return cls.__name__.lower()
@ -48,7 +48,7 @@ class Doas(Become):
class DoasSudo(Doas): class DoasSudo(Doas):
"""Become using 'doas' in ansible-test and then after bootstrapping use 'sudo' for other ansible commands.""" """Become using 'doas' in ansible-test and then after bootstrapping use 'sudo' for other ansible commands."""
@classmethod @classmethod
def name(cls): def name(cls) -> str:
"""The name of this plugin.""" """The name of this plugin."""
return 'doas_sudo' return 'doas_sudo'
@ -78,7 +78,7 @@ class Su(Become):
class SuSudo(Su): class SuSudo(Su):
"""Become using 'su' in ansible-test and then after bootstrapping use 'sudo' for other ansible commands.""" """Become using 'su' in ansible-test and then after bootstrapping use 'sudo' for other ansible commands."""
@classmethod @classmethod
def name(cls): def name(cls) -> str:
"""The name of this plugin.""" """The name of this plugin."""
return 'su_sudo' return 'su_sudo'

@ -29,7 +29,7 @@ class CGroupEntry:
path: pathlib.PurePosixPath path: pathlib.PurePosixPath
@property @property
def root_path(self): def root_path(self) -> pathlib.PurePosixPath:
"""The root path for this cgroup subsystem.""" """The root path for this cgroup subsystem."""
return pathlib.PurePosixPath(CGroupPath.ROOT, self.subsystem) return pathlib.PurePosixPath(CGroupPath.ROOT, self.subsystem)

@ -152,6 +152,8 @@ class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
private_key_pem = self.initialize_private_key() private_key_pem = self.initialize_private_key()
private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend()) private_key = load_pem_private_key(to_bytes(private_key_pem), None, default_backend())
assert isinstance(private_key, ec.EllipticCurvePrivateKey)
signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256())) signature_raw_bytes = private_key.sign(payload_bytes, ec.ECDSA(hashes.SHA256()))
return signature_raw_bytes return signature_raw_bytes

@ -40,7 +40,7 @@ CODE = 'azp'
class AzurePipelines(CIProvider): class AzurePipelines(CIProvider):
"""CI provider implementation for Azure Pipelines.""" """CI provider implementation for Azure Pipelines."""
def __init__(self): def __init__(self) -> None:
self.auth = AzurePipelinesAuthHelper() self.auth = AzurePipelinesAuthHelper()
@staticmethod @staticmethod

@ -146,10 +146,8 @@ def get_python_module_utils_name(path: str) -> str:
return name return name
def enumerate_module_utils(): def enumerate_module_utils() -> set[str]:
"""Return a list of available module_utils imports. """Return a list of available module_utils imports."""
:rtype: set[str]
"""
module_utils = [] module_utils = []
for path in data_context().content.walk_files(data_context().content.module_utils_path): for path in data_context().content.walk_files(data_context().content.module_utils_path):

@ -34,7 +34,7 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
These registered completions, if provided, are used to filter the final completion results. These registered completions, if provided, are used to filter the final completion results.
This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221 This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.registered_completions: t.Optional[list[str]] = None self.registered_completions: t.Optional[list[str]] = None

@ -9,7 +9,7 @@ import typing as t
class Substitute: class Substitute:
"""Substitute for missing class which accepts all arguments.""" """Substitute for missing class which accepts all arguments."""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs) -> None:
pass pass
@ -87,7 +87,7 @@ class OptionCompletionFinder(CompletionFinder):
""" """
enabled = bool(argcomplete) enabled = bool(argcomplete)
def __init__(self, *args, validator=None, **kwargs): def __init__(self, *args, validator=None, **kwargs) -> None:
if validator: if validator:
raise ValueError() raise ValueError()

@ -341,7 +341,7 @@ class IntegerParser(DynamicChoicesParser):
class BooleanParser(ChoicesParser): class BooleanParser(ChoicesParser):
"""Composite argument parser for boolean (yes/no) values.""" """Composite argument parser for boolean (yes/no) values."""
def __init__(self): def __init__(self) -> None:
super().__init__(['yes', 'no']) super().__init__(['yes', 'no'])
def parse(self, state: ParserState) -> bool: def parse(self, state: ParserState) -> bool:

@ -84,25 +84,25 @@ def get_option_name(name: str) -> str:
class PythonVersionUnsupportedError(ApplicationError): class PythonVersionUnsupportedError(ApplicationError):
"""A Python version was requested for a context which does not support that version.""" """A Python version was requested for a context which does not support that version."""
def __init__(self, context, version, versions): def __init__(self, context: str, version: str, versions: c.Iterable[str]) -> None:
super().__init__(f'Python {version} is not supported by environment `{context}`. Supported Python version(s) are: {", ".join(versions)}') super().__init__(f'Python {version} is not supported by environment `{context}`. Supported Python version(s) are: {", ".join(versions)}')
class PythonVersionUnspecifiedError(ApplicationError): class PythonVersionUnspecifiedError(ApplicationError):
"""A Python version was not specified for a context which is unknown, thus the Python version is unknown.""" """A Python version was not specified for a context which is unknown, thus the Python version is unknown."""
def __init__(self, context): def __init__(self, context: str) -> None:
super().__init__(f'A Python version was not specified for environment `{context}`. Use the `--python` option to specify a Python version.') super().__init__(f'A Python version was not specified for environment `{context}`. Use the `--python` option to specify a Python version.')
class ControllerNotSupportedError(ApplicationError): class ControllerNotSupportedError(ApplicationError):
"""Option(s) were specified which do not provide support for the controller and would be ignored because they are irrelevant for the target.""" """Option(s) were specified which do not provide support for the controller and would be ignored because they are irrelevant for the target."""
def __init__(self, context): def __init__(self, context: str) -> None:
super().__init__(f'Environment `{context}` does not provide a Python version supported by the controller.') super().__init__(f'Environment `{context}` does not provide a Python version supported by the controller.')
class OptionsConflictError(ApplicationError): class OptionsConflictError(ApplicationError):
"""Option(s) were specified which conflict with other options.""" """Option(s) were specified which conflict with other options."""
def __init__(self, first, second): def __init__(self, first: c.Iterable[str], second: c.Iterable[str]) -> None:
super().__init__(f'Options `{" ".join(first)}` cannot be combined with options `{" ".join(second)}`.') super().__init__(f'Options `{" ".join(first)}` cannot be combined with options `{" ".join(second)}`.')
@ -170,22 +170,22 @@ class TargetMode(enum.Enum):
NO_TARGETS = enum.auto() # coverage NO_TARGETS = enum.auto() # coverage
@property @property
def one_host(self): def one_host(self) -> bool:
"""Return True if only one host (the controller) should be used, otherwise return False.""" """Return True if only one host (the controller) should be used, otherwise return False."""
return self in (TargetMode.SANITY, TargetMode.UNITS, TargetMode.NO_TARGETS) return self in (TargetMode.SANITY, TargetMode.UNITS, TargetMode.NO_TARGETS)
@property @property
def no_fallback(self): def no_fallback(self) -> bool:
"""Return True if no fallback is acceptable for the controller (due to options not applying to the target), otherwise return False.""" """Return True if no fallback is acceptable for the controller (due to options not applying to the target), otherwise return False."""
return self in (TargetMode.WINDOWS_INTEGRATION, TargetMode.NETWORK_INTEGRATION, TargetMode.NO_TARGETS) return self in (TargetMode.WINDOWS_INTEGRATION, TargetMode.NETWORK_INTEGRATION, TargetMode.NO_TARGETS)
@property @property
def multiple_pythons(self): def multiple_pythons(self) -> bool:
"""Return True if multiple Python versions are allowed, otherwise False.""" """Return True if multiple Python versions are allowed, otherwise False."""
return self in (TargetMode.SANITY, TargetMode.UNITS) return self in (TargetMode.SANITY, TargetMode.UNITS)
@property @property
def has_python(self): def has_python(self) -> bool:
"""Return True if this mode uses Python, otherwise False.""" """Return True if this mode uses Python, otherwise False."""
return self in (TargetMode.POSIX_INTEGRATION, TargetMode.SANITY, TargetMode.UNITS, TargetMode.SHELL) return self in (TargetMode.POSIX_INTEGRATION, TargetMode.SANITY, TargetMode.UNITS, TargetMode.SHELL)

@ -69,5 +69,5 @@ class TargetsNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
class ControllerRequiredFirstError(CompletionError): class ControllerRequiredFirstError(CompletionError):
"""Exception raised when controller and target options are specified out-of-order.""" """Exception raised when controller and target options are specified out-of-order."""
def __init__(self): def __init__(self) -> None:
super().__init__('The `--controller` option must be specified before `--target` option(s).') super().__init__('The `--controller` option must be specified before `--target` option(s).')

@ -99,7 +99,7 @@ class ControllerKeyValueParser(KeyValueParser):
class DockerKeyValueParser(KeyValueParser): class DockerKeyValueParser(KeyValueParser):
"""Composite argument parser for docker key/value pairs.""" """Composite argument parser for docker key/value pairs."""
def __init__(self, image, controller): def __init__(self, image: str, controller: bool) -> None:
self.controller = controller self.controller = controller
self.versions = get_docker_pythons(image, controller, False) self.versions = get_docker_pythons(image, controller, False)
self.allow_default = bool(get_docker_pythons(image, controller, True)) self.allow_default = bool(get_docker_pythons(image, controller, True))
@ -135,7 +135,7 @@ class DockerKeyValueParser(KeyValueParser):
class PosixRemoteKeyValueParser(KeyValueParser): class PosixRemoteKeyValueParser(KeyValueParser):
"""Composite argument parser for POSIX remote key/value pairs.""" """Composite argument parser for POSIX remote key/value pairs."""
def __init__(self, name, controller): def __init__(self, name: str, controller: bool) -> None:
self.controller = controller self.controller = controller
self.versions = get_remote_pythons(name, controller, False) self.versions = get_remote_pythons(name, controller, False)
self.allow_default = bool(get_remote_pythons(name, controller, True)) self.allow_default = bool(get_remote_pythons(name, controller, True))

@ -227,7 +227,7 @@ def read_python_coverage_legacy(path: str) -> PythonArcs:
contents = read_text_file(path) contents = read_text_file(path)
contents = re.sub(r'''^!coverage.py: This is a private format, don't read it directly!''', '', contents) contents = re.sub(r'''^!coverage.py: This is a private format, don't read it directly!''', '', contents)
data = json.loads(contents) data = json.loads(contents)
arcs: PythonArcs = {filename: [tuple(arc) for arc in arcs] for filename, arcs in data['arcs'].items()} arcs: PythonArcs = {filename: [t.cast(tuple[int, int], tuple(arc)) for arc in arc_list] for filename, arc_list in data['arcs'].items()}
except Exception as ex: except Exception as ex:
raise CoverageError(path, f'Error reading JSON coverage file: {ex}') from ex raise CoverageError(path, f'Error reading JSON coverage file: {ex}') from ex

@ -20,6 +20,7 @@ from .. import (
) )
TargetKey = t.TypeVar('TargetKey', int, tuple[int, int]) TargetKey = t.TypeVar('TargetKey', int, tuple[int, int])
TFlexKey = t.TypeVar('TFlexKey', int, tuple[int, int], str)
NamedPoints = dict[str, dict[TargetKey, set[str]]] NamedPoints = dict[str, dict[TargetKey, set[str]]]
IndexedPoints = dict[str, dict[TargetKey, set[int]]] IndexedPoints = dict[str, dict[TargetKey, set[int]]]
Arcs = dict[str, dict[tuple[int, int], set[int]]] Arcs = dict[str, dict[tuple[int, int], set[int]]]
@ -120,10 +121,10 @@ def get_target_index(name: str, target_indexes: TargetIndexes) -> int:
def expand_indexes( def expand_indexes(
source_data: IndexedPoints, source_data: IndexedPoints,
source_index: list[str], source_index: list[str],
format_func: c.Callable[[TargetKey], str], format_func: c.Callable[[TargetKey], TFlexKey],
) -> NamedPoints: ) -> dict[str, dict[TFlexKey, set[str]]]:
"""Expand indexes from the source into target names for easier processing of the data (arcs or lines).""" """Expand indexes from the source into target names for easier processing of the data (arcs or lines)."""
combined_data: dict[str, dict[t.Any, set[str]]] = {} combined_data: dict[str, dict[TFlexKey, set[str]]] = {}
for covered_path, covered_points in source_data.items(): for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {}) combined_points = combined_data.setdefault(covered_path, {})

@ -24,6 +24,7 @@ from . import (
from . import ( from . import (
NamedPoints, NamedPoints,
TargetKey,
TargetIndexes, TargetIndexes,
) )
@ -50,8 +51,12 @@ def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterCo
covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file) covered_targets, covered_path_arcs, covered_path_lines = read_report(args.input_file)
filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, lambda v: v) def pass_target_key(value: TargetKey) -> TargetKey:
filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, lambda v: v) """Return the given target key unmodified."""
return value
filtered_path_arcs = expand_indexes(covered_path_arcs, covered_targets, pass_target_key)
filtered_path_lines = expand_indexes(covered_path_lines, covered_targets, pass_target_key)
include_targets = set(args.include_targets) if args.include_targets else None include_targets = set(args.include_targets) if args.include_targets else None
exclude_targets = set(args.exclude_targets) if args.exclude_targets else None exclude_targets = set(args.exclude_targets) if args.exclude_targets else None
@ -59,7 +64,7 @@ def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterCo
include_path = re.compile(args.include_path) if args.include_path else None include_path = re.compile(args.include_path) if args.include_path else None
exclude_path = re.compile(args.exclude_path) if args.exclude_path else None exclude_path = re.compile(args.exclude_path) if args.exclude_path else None
def path_filter_func(path): def path_filter_func(path: str) -> bool:
"""Return True if the given path should be included, otherwise return False.""" """Return True if the given path should be included, otherwise return False."""
if include_path and not re.search(include_path, path): if include_path and not re.search(include_path, path):
return False return False
@ -69,7 +74,7 @@ def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterCo
return True return True
def target_filter_func(targets): def target_filter_func(targets: set[str]) -> set[str]:
"""Filter the given targets and return the result based on the defined includes and excludes.""" """Filter the given targets and return the result based on the defined includes and excludes."""
if include_targets: if include_targets:
targets &= include_targets targets &= include_targets

@ -101,7 +101,7 @@ def combine_coverage_files(args: CoverageCombineConfig, host_state: HostState) -
class ExportedCoverageDataNotFound(ApplicationError): class ExportedCoverageDataNotFound(ApplicationError):
"""Exception when no combined coverage data is present yet is required.""" """Exception when no combined coverage data is present yet is required."""
def __init__(self): def __init__(self) -> None:
super().__init__( super().__init__(
'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n' 'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n'
'Export coverage with `ansible-test coverage combine` using the `--export` option.\n' 'Export coverage with `ansible-test coverage combine` using the `--export` option.\n'

@ -819,7 +819,7 @@ def integration_environment(
class IntegrationEnvironment: class IntegrationEnvironment:
"""Details about the integration environment.""" """Details about the integration environment."""
def __init__(self, test_dir, integration_dir, targets_dir, inventory_path, ansible_config, vars_file): def __init__(self, test_dir: str, integration_dir: str, targets_dir: str, inventory_path: str, ansible_config: str, vars_file: str) -> None:
self.test_dir = test_dir self.test_dir = test_dir
self.integration_dir = integration_dir self.integration_dir = integration_dir
self.targets_dir = targets_dir self.targets_dir = targets_dir
@ -831,17 +831,13 @@ class IntegrationEnvironment:
class IntegrationCache(CommonCache): class IntegrationCache(CommonCache):
"""Integration cache.""" """Integration cache."""
@property @property
def integration_targets(self): def integration_targets(self) -> list[IntegrationTarget]:
""" """The list of integration test targets."""
:rtype: list[IntegrationTarget]
"""
return self.get('integration_targets', lambda: list(walk_integration_targets())) return self.get('integration_targets', lambda: list(walk_integration_targets()))
@property @property
def dependency_map(self): def dependency_map(self) -> dict[str, set[IntegrationTarget]]:
""" """The dependency map of integration test targets."""
:rtype: dict[str, set[IntegrationTarget]]
"""
return self.get('dependency_map', lambda: generate_dependency_map(self.integration_targets)) return self.get('dependency_map', lambda: generate_dependency_map(self.integration_targets))

@ -131,7 +131,7 @@ class CsCloudProvider(CloudProvider):
def _get_credentials(self, container_name: str) -> dict[str, t.Any]: def _get_credentials(self, container_name: str) -> dict[str, t.Any]:
"""Wait for the CloudStack simulator to return credentials.""" """Wait for the CloudStack simulator to return credentials."""
def check(value): def check(value) -> bool:
"""Return True if the given configuration is valid JSON, otherwise return False.""" """Return True if the given configuration is valid JSON, otherwise return False."""
# noinspection PyBroadException # noinspection PyBroadException
try: try:

@ -158,7 +158,7 @@ class PosixCoverageHandler(CoverageHandler[PosixConfig]):
self.teardown_controller() self.teardown_controller()
self.teardown_target() self.teardown_target()
def setup_controller(self): def setup_controller(self) -> None:
"""Perform setup for code coverage on the controller.""" """Perform setup for code coverage on the controller."""
coverage_config_path = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME) coverage_config_path = os.path.join(self.common_temp_path, COVERAGE_CONFIG_NAME)
coverage_output_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name) coverage_output_path = os.path.join(self.common_temp_path, ResultType.COVERAGE.name)
@ -171,7 +171,7 @@ class PosixCoverageHandler(CoverageHandler[PosixConfig]):
os.mkdir(coverage_output_path) os.mkdir(coverage_output_path)
verified_chmod(coverage_output_path, MODE_DIRECTORY_WRITE) verified_chmod(coverage_output_path, MODE_DIRECTORY_WRITE)
def setup_target(self): def setup_target(self) -> None:
"""Perform setup for code coverage on the target.""" """Perform setup for code coverage on the target."""
if not self.target_profile: if not self.target_profile:
return return

@ -831,7 +831,7 @@ class SanitySingleVersion(SanityTest, metaclass=abc.ABCMeta):
class SanityCodeSmellTest(SanitySingleVersion): class SanityCodeSmellTest(SanitySingleVersion):
"""Sanity test script.""" """Sanity test script."""
def __init__(self, path): def __init__(self, path) -> None:
name = os.path.splitext(os.path.basename(path))[0] name = os.path.splitext(os.path.basename(path))[0]
config_path = os.path.splitext(path)[0] + '.json' config_path = os.path.splitext(path)[0] + '.json'
@ -866,10 +866,10 @@ class SanityCodeSmellTest(SanitySingleVersion):
self.extensions = [] self.extensions = []
self.prefixes = [] self.prefixes = []
self.files = [] self.files = []
self.text: t.Optional[bool] = None self.text = None
self.ignore_self = False self.ignore_self = False
self.minimum_python_version: t.Optional[str] = None self.minimum_python_version = None
self.maximum_python_version: t.Optional[str] = None self.maximum_python_version = None
self.__all_targets = False self.__all_targets = False
self.__no_targets = True self.__no_targets = True

@ -104,7 +104,7 @@ class IntegrationAliasesTest(SanitySingleVersion):
ansible_only = True ansible_only = True
def __init__(self): def __init__(self) -> None:
super().__init__() super().__init__()
self._ci_config: dict[str, t.Any] = {} self._ci_config: dict[str, t.Any] = {}
@ -307,10 +307,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
return messages return messages
def check_windows_targets(self): def check_windows_targets(self) -> list[SanityMessage]:
""" """Check Windows integration test targets and return messages with any issues found."""
:rtype: list[SanityMessage]
"""
windows_targets = tuple(walk_windows_integration_targets()) windows_targets = tuple(walk_windows_integration_targets())
messages = [] messages = []

@ -58,7 +58,7 @@ from ...host_configs import (
class PylintTest(SanitySingleVersion): class PylintTest(SanitySingleVersion):
"""Sanity test using pylint.""" """Sanity test using pylint."""
def __init__(self): def __init__(self) -> None:
super().__init__() super().__init__()
self.optional_error_codes.update([ self.optional_error_codes.update([
'ansible-deprecated-date', 'ansible-deprecated-date',

@ -60,7 +60,7 @@ from ...host_configs import (
class ValidateModulesTest(SanitySingleVersion): class ValidateModulesTest(SanitySingleVersion):
"""Sanity test using validate-modules.""" """Sanity test using validate-modules."""
def __init__(self): def __init__(self) -> None:
super().__init__() super().__init__()
self.optional_error_codes.update([ self.optional_error_codes.update([

@ -54,7 +54,7 @@ class CompletionConfig(metaclass=abc.ABCMeta):
@property @property
@abc.abstractmethod @abc.abstractmethod
def is_default(self): def is_default(self) -> bool:
"""True if the completion entry is only used for defaults, otherwise False.""" """True if the completion entry is only used for defaults, otherwise False."""
@ -107,17 +107,17 @@ class RemoteCompletionConfig(CompletionConfig):
arch: t.Optional[str] = None arch: t.Optional[str] = None
@property @property
def platform(self): def platform(self) -> str:
"""The name of the platform.""" """The name of the platform."""
return self.name.partition('/')[0] return self.name.partition('/')[0]
@property @property
def version(self): def version(self) -> str:
"""The version of the platform.""" """The version of the platform."""
return self.name.partition('/')[2] return self.name.partition('/')[2]
@property @property
def is_default(self): def is_default(self) -> bool:
"""True if the completion entry is only used for defaults, otherwise False.""" """True if the completion entry is only used for defaults, otherwise False."""
return not self.version return not self.version
@ -166,7 +166,7 @@ class DockerCompletionConfig(PythonCompletionConfig):
placeholder: bool = False placeholder: bool = False
@property @property
def is_default(self): def is_default(self) -> bool:
"""True if the completion entry is only used for defaults, otherwise False.""" """True if the completion entry is only used for defaults, otherwise False."""
return False return False
@ -276,7 +276,9 @@ def filter_completion(
) -> dict[str, TCompletionConfig]: ) -> dict[str, TCompletionConfig]:
"""Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified.""" """Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified."""
if controller_only: if controller_only:
completion = {name: config for name, config in completion.items() if isinstance(config, PosixCompletionConfig) and config.controller_supported} # The cast is needed because mypy gets confused here and forgets that completion values are TCompletionConfig.
completion = {name: t.cast(TCompletionConfig, config) for name, config in completion.items() if
isinstance(config, PosixCompletionConfig) and config.controller_supported}
if not include_defaults: if not include_defaults:
completion = {name: config for name, config in completion.items() if not config.is_default} completion = {name: config for name, config in completion.items() if not config.is_default}

@ -844,12 +844,12 @@ def create_container_hooks(
control_state: dict[str, tuple[list[str], list[SshProcess]]] = {} control_state: dict[str, tuple[list[str], list[SshProcess]]] = {}
managed_state: dict[str, tuple[list[str], list[SshProcess]]] = {} managed_state: dict[str, tuple[list[str], list[SshProcess]]] = {}
def pre_target(target): def pre_target(target: IntegrationTarget) -> None:
"""Configure hosts for SSH port forwarding required by the specified target.""" """Configure hosts for SSH port forwarding required by the specified target."""
forward_ssh_ports(args, control_connections, '%s_hosts_prepare.yml' % control_type, control_state, target, HostType.control, control_contexts) forward_ssh_ports(args, control_connections, '%s_hosts_prepare.yml' % control_type, control_state, target, HostType.control, control_contexts)
forward_ssh_ports(args, managed_connections, '%s_hosts_prepare.yml' % managed_type, managed_state, target, HostType.managed, managed_contexts) forward_ssh_ports(args, managed_connections, '%s_hosts_prepare.yml' % managed_type, managed_state, target, HostType.managed, managed_contexts)
def post_target(target): def post_target(target: IntegrationTarget) -> None:
"""Clean up previously configured SSH port forwarding which was required by the specified target.""" """Clean up previously configured SSH port forwarding which was required by the specified target."""
cleanup_ssh_ports(args, control_connections, '%s_hosts_restore.yml' % control_type, control_state, target, HostType.control) cleanup_ssh_ports(args, control_connections, '%s_hosts_restore.yml' % control_type, control_state, target, HostType.control)
cleanup_ssh_ports(args, managed_connections, '%s_hosts_restore.yml' % managed_type, managed_state, target, HostType.managed) cleanup_ssh_ports(args, managed_connections, '%s_hosts_restore.yml' % managed_type, managed_state, target, HostType.managed)

@ -173,11 +173,11 @@ class AnsibleCoreCI:
self.endpoint = self.default_endpoint self.endpoint = self.default_endpoint
@property @property
def available(self): def available(self) -> bool:
"""Return True if Ansible Core CI is supported.""" """Return True if Ansible Core CI is supported."""
return self.ci_provider.supports_core_ci_auth() return self.ci_provider.supports_core_ci_auth()
def start(self): def start(self) -> t.Optional[dict[str, t.Any]]:
"""Start instance.""" """Start instance."""
if self.started: if self.started:
display.info(f'Skipping started {self.label} instance.', verbosity=1) display.info(f'Skipping started {self.label} instance.', verbosity=1)
@ -185,7 +185,7 @@ class AnsibleCoreCI:
return self._start(self.ci_provider.prepare_core_ci_auth()) return self._start(self.ci_provider.prepare_core_ci_auth())
def stop(self): def stop(self) -> None:
"""Stop instance.""" """Stop instance."""
if not self.started: if not self.started:
display.info(f'Skipping invalid {self.label} instance.', verbosity=1) display.info(f'Skipping invalid {self.label} instance.', verbosity=1)
@ -279,10 +279,10 @@ class AnsibleCoreCI:
raise ApplicationError(f'Timeout waiting for {self.label} instance.') raise ApplicationError(f'Timeout waiting for {self.label} instance.')
@property @property
def _uri(self): def _uri(self) -> str:
return f'{self.endpoint}/{self.stage}/{self.provider}/{self.instance_id}' return f'{self.endpoint}/{self.stage}/{self.provider}/{self.instance_id}'
def _start(self, auth): def _start(self, auth) -> dict[str, t.Any]:
"""Start instance.""" """Start instance."""
display.info(f'Initializing new {self.label} instance using: {self._uri}', verbosity=1) display.info(f'Initializing new {self.label} instance using: {self._uri}', verbosity=1)
@ -341,7 +341,7 @@ class AnsibleCoreCI:
display.warning(f'{error}. Trying again after {sleep} seconds.') display.warning(f'{error}. Trying again after {sleep} seconds.')
time.sleep(sleep) time.sleep(sleep)
def _clear(self): def _clear(self) -> None:
"""Clear instance information.""" """Clear instance information."""
try: try:
self.connection = None self.connection = None
@ -349,7 +349,7 @@ class AnsibleCoreCI:
except FileNotFoundError: except FileNotFoundError:
pass pass
def _load(self): def _load(self) -> bool:
"""Load instance information.""" """Load instance information."""
try: try:
data = read_text_file(self.path) data = read_text_file(self.path)

@ -52,7 +52,7 @@ from .provider.layout.unsupported import (
class DataContext: class DataContext:
"""Data context providing details about the current execution environment for ansible-test.""" """Data context providing details about the current execution environment for ansible-test."""
def __init__(self): def __init__(self) -> None:
content_path = os.environ.get('ANSIBLE_TEST_CONTENT_ROOT') content_path = os.environ.get('ANSIBLE_TEST_CONTENT_ROOT')
current_path = os.getcwd() current_path = os.getcwd()
@ -245,7 +245,7 @@ class PluginInfo:
@cache @cache
def content_plugins(): def content_plugins() -> dict[str, dict[str, PluginInfo]]:
""" """
Analyze content. Analyze content.
The primary purpose of this analysis is to facilitate mapping of integration tests to the plugin(s) they are intended to test. The primary purpose of this analysis is to facilitate mapping of integration tests to the plugin(s) they are intended to test.
@ -256,7 +256,7 @@ def content_plugins():
plugin_paths = sorted(data_context().content.walk_files(plugin_directory)) plugin_paths = sorted(data_context().content.walk_files(plugin_directory))
plugin_directory_offset = len(plugin_directory.split(os.path.sep)) plugin_directory_offset = len(plugin_directory.split(os.path.sep))
plugin_files = {} plugin_files: dict[str, list[str]] = {}
for plugin_path in plugin_paths: for plugin_path in plugin_paths:
plugin_filename = os.path.basename(plugin_path) plugin_filename = os.path.basename(plugin_path)

@ -226,7 +226,7 @@ def delegate_command(args: EnvironmentConfig, host_state: HostState, exclude: li
target.on_target_failure() # when the controller is delegated, report failures after delegation fails target.on_target_failure() # when the controller is delegated, report failures after delegation fails
def insert_options(command, options): def insert_options(command: list[str], options: list[str]) -> list[str]:
"""Insert addition command line options into the given command and return the result.""" """Insert addition command line options into the given command and return the result."""
result = [] result = []

@ -184,7 +184,7 @@ def check_container_cgroup_status(args: EnvironmentConfig, config: DockerConfig,
write_text_file(os.path.join(args.dev_probe_cgroups, f'{identity}.log'), message) write_text_file(os.path.join(args.dev_probe_cgroups, f'{identity}.log'), message)
def get_identity(args: EnvironmentConfig, config: DockerConfig, container_name: str): def get_identity(args: EnvironmentConfig, config: DockerConfig, container_name: str) -> str:
"""Generate and return an identity string to use when logging test results.""" """Generate and return an identity string to use when logging test results."""
engine = require_docker().command engine = require_docker().command

@ -720,7 +720,7 @@ class DockerError(Exception):
class ContainerNotFoundError(DockerError): class ContainerNotFoundError(DockerError):
"""The container identified by `identifier` was not found.""" """The container identified by `identifier` was not found."""
def __init__(self, identifier): def __init__(self, identifier: str) -> None:
super().__init__('The container "%s" was not found.' % identifier) super().__init__('The container "%s" was not found.' % identifier)
self.identifier = identifier self.identifier = identifier

@ -81,13 +81,13 @@ def detect_changes(args: TestConfig) -> t.Optional[list[str]]:
class NoChangesDetected(ApplicationWarning): class NoChangesDetected(ApplicationWarning):
"""Exception when change detection was performed, but no changes were found.""" """Exception when change detection was performed, but no changes were found."""
def __init__(self): def __init__(self) -> None:
super().__init__('No changes detected.') super().__init__('No changes detected.')
class NoTestsForChanges(ApplicationWarning): class NoTestsForChanges(ApplicationWarning):
"""Exception when changes detected, but no tests trigger as a result.""" """Exception when changes detected, but no tests trigger as a result."""
def __init__(self): def __init__(self) -> None:
super().__init__('No tests found for detected changes.') super().__init__('No tests found for detected changes.')
@ -111,5 +111,5 @@ class ListTargets(Exception):
class AllTargetsSkipped(ApplicationWarning): class AllTargetsSkipped(ApplicationWarning):
"""All targets skipped.""" """All targets skipped."""
def __init__(self): def __init__(self) -> None:
super().__init__('All targets skipped.') super().__init__('All targets skipped.')

@ -48,7 +48,7 @@ from .util import (
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class OriginCompletionConfig(PosixCompletionConfig): class OriginCompletionConfig(PosixCompletionConfig):
"""Pseudo completion config for the origin.""" """Pseudo completion config for the origin."""
def __init__(self): def __init__(self) -> None:
super().__init__(name='origin') super().__init__(name='origin')
@property @property
@ -65,7 +65,7 @@ class OriginCompletionConfig(PosixCompletionConfig):
return version return version
@property @property
def is_default(self): def is_default(self) -> bool:
"""True if the completion entry is only used for defaults, otherwise False.""" """True if the completion entry is only used for defaults, otherwise False."""
return False return False
@ -513,7 +513,7 @@ class HostSettings:
with open_binary_file(path) as settings_file: with open_binary_file(path) as settings_file:
return pickle.load(settings_file) return pickle.load(settings_file)
def apply_defaults(self): def apply_defaults(self) -> None:
"""Apply defaults to the host settings.""" """Apply defaults to the host settings."""
context = HostContext(controller_config=None) context = HostContext(controller_config=None)
self.controller.apply_defaults(context, self.controller.get_defaults(context)) self.controller.apply_defaults(context, self.controller.get_defaults(context))

@ -351,7 +351,7 @@ class RemoteProfile(SshTargetHostProfile[TRemoteConfig], metaclass=abc.ABCMeta):
return self.core_ci return self.core_ci
def delete_instance(self): def delete_instance(self) -> None:
"""Delete the AnsibleCoreCI VM instance.""" """Delete the AnsibleCoreCI VM instance."""
core_ci = self.get_instance() core_ci = self.get_instance()
@ -892,7 +892,7 @@ class DockerProfile(ControllerHostProfile[DockerConfig], SshTargetHostProfile[Do
return message return message
def check_cgroup_requirements(self): def check_cgroup_requirements(self) -> None:
"""Check cgroup requirements for the container.""" """Check cgroup requirements for the container."""
cgroup_version = get_docker_info(self.args).cgroup_version cgroup_version = get_docker_info(self.args).cgroup_version

@ -80,7 +80,7 @@ def open_binary_file(path: str, mode: str = 'rb') -> t.IO[bytes]:
class SortedSetEncoder(json.JSONEncoder): class SortedSetEncoder(json.JSONEncoder):
"""Encode sets as sorted lists.""" """Encode sets as sorted lists."""
def default(self, o): def default(self, o: t.Any) -> t.Any:
"""Return a serialized version of the `o` object.""" """Return a serialized version of the `o` object."""
if isinstance(o, set): if isinstance(o, set):
return sorted(o) return sorted(o)

@ -19,7 +19,7 @@ from .diff import (
class Metadata: class Metadata:
"""Metadata object for passing data to delegated tests.""" """Metadata object for passing data to delegated tests."""
def __init__(self): def __init__(self) -> None:
"""Initialize metadata.""" """Initialize metadata."""
self.changes: dict[str, tuple[tuple[int, int], ...]] = {} self.changes: dict[str, tuple[tuple[int, int], ...]] = {}
self.cloud_config: t.Optional[dict[str, dict[str, t.Union[int, str, bool]]]] = None self.cloud_config: t.Optional[dict[str, dict[str, t.Union[int, str, bool]]]] = None
@ -82,7 +82,7 @@ class Metadata:
class ChangeDescription: class ChangeDescription:
"""Description of changes.""" """Description of changes."""
def __init__(self): def __init__(self) -> None:
self.command: str = '' self.command: str = ''
self.changed_paths: list[str] = [] self.changed_paths: list[str] = []
self.deleted_paths: list[str] = [] self.deleted_paths: list[str] = []

@ -150,7 +150,7 @@ class ContentLayout(Layout):
class LayoutMessages: class LayoutMessages:
"""Messages generated during layout creation that should be deferred for later display.""" """Messages generated during layout creation that should be deferred for later display."""
def __init__(self): def __init__(self) -> None:
self.info: list[str] = [] self.info: list[str] = []
self.warning: list[str] = [] self.warning: list[str] = []
self.error: list[str] = [] self.error: list[str] = []

@ -119,7 +119,7 @@ def configure_target_pypi_proxy(args: EnvironmentConfig, profile: HostProfile, p
create_posix_inventory(args, inventory_path, [profile]) create_posix_inventory(args, inventory_path, [profile])
def cleanup_pypi_proxy(): def cleanup_pypi_proxy() -> None:
"""Undo changes made to configure the PyPI proxy.""" """Undo changes made to configure the PyPI proxy."""
run_playbook(args, inventory_path, 'pypi_proxy_restore.yml', capture=True) run_playbook(args, inventory_path, 'pypi_proxy_restore.yml', capture=True)

@ -136,10 +136,8 @@ def filter_targets(targets: c.Iterable[TCompletionTarget],
raise TargetPatternsNotMatched(unmatched) raise TargetPatternsNotMatched(unmatched)
def walk_module_targets(): def walk_module_targets() -> c.Iterable[TestTarget]:
""" """Iterate through the module test targets."""
:rtype: collections.Iterable[TestTarget]
"""
for target in walk_test_targets(path=data_context().content.module_path, module_path=data_context().content.module_path, extensions=MODULE_EXTENSIONS): for target in walk_test_targets(path=data_context().content.module_path, module_path=data_context().content.module_path, extensions=MODULE_EXTENSIONS):
if not target.module: if not target.module:
continue continue
@ -244,10 +242,8 @@ def walk_integration_targets() -> c.Iterable[IntegrationTarget]:
yield IntegrationTarget(to_text(path), modules, prefixes) yield IntegrationTarget(to_text(path), modules, prefixes)
def load_integration_prefixes(): def load_integration_prefixes() -> dict[str, str]:
""" """Load and return the integration test prefixes."""
:rtype: dict[str, str]
"""
path = data_context().content.integration_path path = data_context().content.integration_path
file_paths = sorted(f for f in data_context().content.get_files(path) if os.path.splitext(os.path.basename(f))[0] == 'target-prefixes') file_paths = sorted(f for f in data_context().content.get_files(path) if os.path.splitext(os.path.basename(f))[0] == 'target-prefixes')
prefixes = {} prefixes = {}
@ -313,7 +309,7 @@ def analyze_integration_target_dependencies(integration_targets: list[Integratio
role_targets = [target for target in integration_targets if target.type == 'role'] role_targets = [target for target in integration_targets if target.type == 'role']
hidden_role_target_names = set(target.name for target in role_targets if 'hidden/' in target.aliases) hidden_role_target_names = set(target.name for target in role_targets if 'hidden/' in target.aliases)
dependencies = collections.defaultdict(set) dependencies: collections.defaultdict[str, set[str]] = collections.defaultdict(set)
# handle setup dependencies # handle setup dependencies
for target in integration_targets: for target in integration_targets:
@ -405,12 +401,12 @@ def analyze_integration_target_dependencies(integration_targets: list[Integratio
class CompletionTarget(metaclass=abc.ABCMeta): class CompletionTarget(metaclass=abc.ABCMeta):
"""Command-line argument completion target base class.""" """Command-line argument completion target base class."""
def __init__(self): def __init__(self) -> None:
self.name = None self.name = ''
self.path = None self.path = ''
self.base_path = None self.base_path: t.Optional[str] = None
self.modules = tuple() self.modules: tuple[str, ...] = tuple()
self.aliases = tuple() self.aliases: tuple[str, ...] = tuple()
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, CompletionTarget): if isinstance(other, CompletionTarget):
@ -446,7 +442,7 @@ class TestTarget(CompletionTarget):
module_prefix: t.Optional[str], module_prefix: t.Optional[str],
base_path: str, base_path: str,
symlink: t.Optional[bool] = None, symlink: t.Optional[bool] = None,
): ) -> None:
super().__init__() super().__init__()
if symlink is None: if symlink is None:
@ -665,8 +661,6 @@ class IntegrationTarget(CompletionTarget):
target_type, actual_type = categorize_integration_test(self.name, list(static_aliases), force_target) target_type, actual_type = categorize_integration_test(self.name, list(static_aliases), force_target)
self._remove_group(groups, 'context')
groups.extend(['context/', f'context/{target_type.name.lower()}']) groups.extend(['context/', f'context/{target_type.name.lower()}'])
if target_type != actual_type: if target_type != actual_type:
@ -695,10 +689,6 @@ class IntegrationTarget(CompletionTarget):
self.setup_always = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('setup/always/')))) self.setup_always = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('setup/always/'))))
self.needs_target = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('needs/target/')))) self.needs_target = tuple(sorted(set(g.split('/')[2] for g in groups if g.startswith('needs/target/'))))
@staticmethod
def _remove_group(groups, group):
return [g for g in groups if g != group and not g.startswith('%s/' % group)]
class TargetPatternsNotMatched(ApplicationError): class TargetPatternsNotMatched(ApplicationError):
"""One or more targets were not matched when a match was required.""" """One or more targets were not matched when a match was required."""

@ -333,10 +333,8 @@ class TestFailure(TestResult):
return command return command
def find_docs(self): def find_docs(self) -> t.Optional[str]:
""" """Return the docs URL for this test or None if there is no docs URL."""
:rtype: str
"""
if self.command != 'sanity': if self.command != 'sanity':
return None # only sanity tests have docs links return None # only sanity tests have docs links

@ -21,7 +21,7 @@ class WrappedThread(threading.Thread):
self.action = action self.action = action
self.result = None self.result = None
def run(self): def run(self) -> None:
""" """
Run action and capture results or exception. Run action and capture results or exception.
Do not override. Do not call directly. Executed by the start() method. Do not override. Do not call directly. Executed by the start() method.
@ -35,11 +35,8 @@ class WrappedThread(threading.Thread):
except: # noqa except: # noqa
self._result.put((None, sys.exc_info())) self._result.put((None, sys.exc_info()))
def wait_for_result(self): def wait_for_result(self) -> t.Any:
""" """Wait for thread to exit and return the result or raise an exception."""
Wait for thread to exit and return the result or raise an exception.
:rtype: any
"""
result, exception = self._result.get() result, exception = self._result.get()
if exception: if exception:

@ -75,7 +75,7 @@ def configure_test_timeout(args: TestConfig) -> None:
display.info('The %d minute test timeout expires in %s at %s.' % ( display.info('The %d minute test timeout expires in %s at %s.' % (
timeout_duration, timeout_remaining, timeout_deadline), verbosity=1) timeout_duration, timeout_remaining, timeout_deadline), verbosity=1)
def timeout_handler(_dummy1, _dummy2): def timeout_handler(_dummy1: t.Any, _dummy2: t.Any) -> None:
"""Runs when SIGUSR1 is received.""" """Runs when SIGUSR1 is received."""
test_timeout.write(args) test_timeout.write(args)

@ -611,7 +611,7 @@ class OutputThread(ReaderThread):
src.close() src.close()
def common_environment(): def common_environment() -> dict[str, str]:
"""Common environment used for executing all programs.""" """Common environment used for executing all programs."""
env = dict( env = dict(
LC_ALL=CONFIGURED_LOCALE, LC_ALL=CONFIGURED_LOCALE,
@ -793,17 +793,17 @@ class Display:
3: cyan, 3: cyan,
} }
def __init__(self): def __init__(self) -> None:
self.verbosity = 0 self.verbosity = 0
self.color = sys.stdout.isatty() self.color = sys.stdout.isatty()
self.warnings = [] self.warnings: list[str] = []
self.warnings_unique = set() self.warnings_unique: set[str] = set()
self.fd = sys.stderr # default to stderr until config is initialized to avoid early messages going to stdout self.fd = sys.stderr # default to stderr until config is initialized to avoid early messages going to stdout
self.rows = 0 self.rows = 0
self.columns = 0 self.columns = 0
self.truncate = 0 self.truncate = 0
self.redact = True self.redact = True
self.sensitive = set() self.sensitive: set[str] = set()
if os.isatty(0): if os.isatty(0):
self.rows, self.columns = unpack('HHHH', fcntl.ioctl(0, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[:2] self.rows, self.columns = unpack('HHHH', fcntl.ioctl(0, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[:2]
@ -959,7 +959,7 @@ class HostConnectionError(ApplicationError):
self._callback() self._callback()
def retry(func, ex_type=SubprocessError, sleep=10, attempts=10, warn=True): def retry(func: t.Callable[..., TValue], ex_type: t.Type[BaseException] = SubprocessError, sleep: int = 10, attempts: int = 10, warn: bool = True) -> TValue:
"""Retry the specified function on failure.""" """Retry the specified function on failure."""
for dummy in range(1, attempts): for dummy in range(1, attempts):
try: try:
@ -1090,7 +1090,7 @@ def load_module(path: str, name: str) -> None:
spec.loader.exec_module(module) spec.loader.exec_module(module)
def sanitize_host_name(name): def sanitize_host_name(name: str) -> str:
"""Return a sanitized version of the given name, suitable for use as a hostname.""" """Return a sanitized version of the given name, suitable for use as a hostname."""
return re.sub('[^A-Za-z0-9]+', '-', name)[:63].strip('-') return re.sub('[^A-Za-z0-9]+', '-', name)[:63].strip('-')

@ -96,7 +96,7 @@ class ResultType:
TMP: ResultType = None TMP: ResultType = None
@staticmethod @staticmethod
def _populate(): def _populate() -> None:
ResultType.BOT = ResultType('bot') ResultType.BOT = ResultType('bot')
ResultType.COVERAGE = ResultType('coverage') ResultType.COVERAGE = ResultType('coverage')
ResultType.DATA = ResultType('data') ResultType.DATA = ResultType('data')
@ -288,7 +288,7 @@ def get_injector_path() -> str:
verified_chmod(injector_path, MODE_DIRECTORY) verified_chmod(injector_path, MODE_DIRECTORY)
def cleanup_injector(): def cleanup_injector() -> None:
"""Remove the temporary injector directory.""" """Remove the temporary injector directory."""
remove_tree(injector_path) remove_tree(injector_path)
@ -388,7 +388,7 @@ def create_interpreter_wrapper(interpreter: str, injected_interpreter: str) -> N
verified_chmod(injected_interpreter, MODE_FILE_EXECUTE) verified_chmod(injected_interpreter, MODE_FILE_EXECUTE)
def cleanup_python_paths(): def cleanup_python_paths() -> None:
"""Clean up all temporary python directories.""" """Clean up all temporary python directories."""
for path in sorted(PYTHON_PATHS.values()): for path in sorted(PYTHON_PATHS.values()):
display.info('Cleaning up temporary python directory: %s' % path, verbosity=2) display.info('Cleaning up temporary python directory: %s' % path, verbosity=2)
@ -449,7 +449,7 @@ def run_command(
output_stream=output_stream, cmd_verbosity=cmd_verbosity, str_errors=str_errors, error_callback=error_callback) output_stream=output_stream, cmd_verbosity=cmd_verbosity, str_errors=str_errors, error_callback=error_callback)
def yamlcheck(python): def yamlcheck(python: PythonConfig) -> t.Optional[bool]:
"""Return True if PyYAML has libyaml support, False if it does not and None if it was not found.""" """Return True if PyYAML has libyaml support, False if it does not and None if it was not found."""
result = json.loads(raw_command([python.path, os.path.join(ANSIBLE_TEST_TARGET_TOOLS_ROOT, 'yamlcheck.py')], capture=True)[0]) result = json.loads(raw_command([python.path, os.path.join(ANSIBLE_TEST_TARGET_TOOLS_ROOT, 'yamlcheck.py')], capture=True)[0])

@ -44,7 +44,8 @@ def main():
# noinspection PyCompatibility # noinspection PyCompatibility
from importlib import import_module from importlib import import_module
except ImportError: except ImportError:
def import_module(name): def import_module(name, package=None): # type: (str, str | None) -> types.ModuleType
assert package is None
__import__(name) __import__(name)
return sys.modules[name] return sys.modules[name]

Loading…
Cancel
Save