ansible-test - Convert more type hints. (#78449)

* Simple regex replace of multi-line function arg annotations.

* Simple regex replace of multi-line function arg annotations with default values.

* Simple regex replace of multi-line function arg return annotations.

* Simple regex replace of assignment annotations.
pull/78456/head
Matt Clay 2 years ago committed by GitHub
parent 2218b63aef
commit b993b5cd49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -53,7 +53,7 @@ def main(cli_args: t.Optional[t.List[str]] = None) -> None:
try:
os.chdir(data_context().content.root)
args = parse_args(cli_args)
config = args.config(args) # type: CommonConfig
config: CommonConfig = args.config(args)
display.verbosity = config.verbosity
display.truncate = config.truncate
display.redact = config.redact

@ -248,7 +248,7 @@ License: GPLv3+
class CollectionDetail:
"""Collection detail."""
def __init__(self) -> None:
self.version = None # type: t.Optional[str]
self.version: t.Optional[str] = None
class CollectionDetailError(ApplicationError):
@ -279,12 +279,12 @@ def get_collection_detail(python: PythonConfig) -> CollectionDetail:
def run_playbook(
args, # type: EnvironmentConfig
inventory_path, # type: str
args: EnvironmentConfig,
inventory_path: str,
playbook, # type: str
capture, # type: bool
variables=None, # type: t.Optional[t.Dict[str, t.Any]]
): # type: (...) -> None
capture: bool,
variables: t.Optional[t.Dict[str, t.Any]] = None,
) -> None:
"""Run the specified playbook using the given inventory file and playbook variables."""
playbook_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'playbooks', playbook)
cmd = ['ansible-playbook', '-i', inventory_path, playbook_path]

@ -68,10 +68,10 @@ def categorize_changes(args: TestConfig, paths: t.List[str], verbose_command: t.
focused_commands = collections.defaultdict(set)
deleted_paths = set() # type: t.Set[str]
original_paths = set() # type: t.Set[str]
additional_paths = set() # type: t.Set[str]
no_integration_paths = set() # type: t.Set[str]
deleted_paths: t.Set[str] = set()
original_paths: t.Set[str] = set()
additional_paths: t.Set[str] = set()
no_integration_paths: t.Set[str] = set()
for path in paths:
if not os.path.exists(path):
@ -210,7 +210,7 @@ class PathMapper:
self.powershell_module_utils_imports = {} # type: t.Dict[str, t.Set[str]] # populated on first use to reduce overhead when not needed
self.csharp_module_utils_imports = {} # type: t.Dict[str, t.Set[str]] # populated on first use to reduce overhead when not needed
self.paths_to_dependent_targets = {} # type: t.Dict[str, t.Set[IntegrationTarget]]
self.paths_to_dependent_targets: t.Dict[str, t.Set[IntegrationTarget]] = {}
for target in self.integration_targets:
for path in target.needs_file:
@ -342,7 +342,7 @@ class PathMapper:
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
minimal = {} # type: t.Dict[str, str]
minimal: t.Dict[str, str] = {}
if os.path.sep not in path:
if filename in (
@ -631,7 +631,7 @@ class PathMapper:
filename = os.path.basename(path)
dummy, ext = os.path.splitext(filename)
minimal = {} # type: t.Dict[str, str]
minimal: t.Dict[str, str] = {}
if path.startswith('changelogs/'):
return minimal
@ -675,7 +675,7 @@ class PathMapper:
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
minimal = {} # type: t.Dict[str, str]
minimal: t.Dict[str, str] = {}
if path.startswith('bin/'):
return all_tests(self.args) # broad impact, run all tests

@ -38,7 +38,7 @@ def get_csharp_module_utils_imports(powershell_targets: t.List[TestTarget], csha
for target in csharp_targets:
imports_by_target_path[target.path] = extract_csharp_module_utils_imports(target.path, module_utils, True)
imports = {module_util: set() for module_util in module_utils} # type: t.Dict[str, t.Set[str]]
imports: t.Dict[str, t.Set[str]] = {module_util: set() for module_util in module_utils}
for target_path, modules in imports_by_target_path.items():
for module_util in modules:

@ -35,7 +35,7 @@ def get_powershell_module_utils_imports(powershell_targets: t.List[TestTarget])
for target in powershell_targets:
imports_by_target_path[target.path] = extract_powershell_module_utils_imports(target.path, module_utils)
imports = {module_util: set() for module_util in module_utils} # type: t.Dict[str, t.Set[str]]
imports: t.Dict[str, t.Set[str]] = {module_util: set() for module_util in module_utils}
for target_path, modules in imports_by_target_path.items():
for module_util in modules:

@ -102,7 +102,7 @@ def get_python_module_utils_imports(compile_targets: t.List[TestTarget]) -> t.Di
display.info('%s inherits import %s via %s' % (target_path, module_util_import, module_util), verbosity=6)
modules.add(module_util_import)
imports = {module_util: set() for module_util in module_utils | virtual_utils} # type: t.Dict[str, t.Set[str]]
imports: t.Dict[str, t.Set[str]] = {module_util: set() for module_util in module_utils | virtual_utils}
for target_path, modules in imports_by_target_path.items():
for module_util in modules:
@ -236,7 +236,7 @@ class ModuleUtilFinder(ast.NodeVisitor):
def __init__(self, path: str, module_utils: t.Set[str]) -> None:
self.path = path
self.module_utils = module_utils
self.imports = set() # type: t.Set[str]
self.imports: t.Set[str] = set()
# implicitly import parent package

@ -37,15 +37,15 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.registered_completions = None # type: t.Optional[t.List[str]]
self.registered_completions: t.Optional[t.List[str]] = None
def completer(
self,
prefix, # type: str
action, # type: argparse.Action
parsed_args, # type: argparse.Namespace
prefix: str,
action: argparse.Action,
parsed_args: argparse.Namespace,
**kwargs,
): # type: (...) -> t.List[str]
) -> t.List[str]:
"""
Return a list of completions for the specified prefix and action.
Use this as the completer function for argcomplete.
@ -64,10 +64,10 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
@abc.abstractmethod
def get_completions(
self,
prefix, # type: str
action, # type: argparse.Action
parsed_args, # type: argparse.Namespace
): # type: (...) -> t.List[str]
prefix: str,
action: argparse.Action,
parsed_args: argparse.Namespace,
) -> t.List[str]:
"""
Return a list of completions for the specified prefix and action.
Called by the complete function.
@ -86,7 +86,7 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
"""Base class for actions that parse composite arguments."""
documentation_state = {} # type: t.Dict[t.Type[CompositeAction], DocumentationState]
documentation_state: t.Dict[t.Type[CompositeAction], DocumentationState] = {}
def __init__(
self,
@ -136,10 +136,10 @@ class CompositeActionCompletionFinder(RegisteredCompletionFinder):
"""Completion finder with support for composite argument parsing."""
def get_completions(
self,
prefix, # type: str
action, # type: argparse.Action
parsed_args, # type: argparse.Namespace
): # type: (...) -> t.List[str]
prefix: str,
action: argparse.Action,
parsed_args: argparse.Namespace,
) -> t.List[str]:
"""Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed."""
assert isinstance(action, CompositeAction)
@ -232,9 +232,9 @@ def detect_false_file_completion(value: str, mode: ParserMode) -> bool:
def complete(
completer, # type: Parser
state, # type: ParserState
): # type: (...) -> Completion
completer: Parser,
state: ParserState,
) -> Completion:
"""Perform argument completion using the given completer and return the completion result."""
value = state.remainder

@ -9,7 +9,7 @@ import typing as t
class EnumAction(argparse.Action):
"""Parse an enum using the lowercase enum names."""
def __init__(self, **kwargs: t.Any) -> None:
self.enum_type = kwargs.pop('type', None) # type: t.Type[enum.Enum]
self.enum_type: t.Type[enum.Enum] = kwargs.pop('type', None)
kwargs.setdefault('choices', tuple(e.name.lower() for e in self.enum_type))
super().__init__(**kwargs)

@ -44,9 +44,9 @@ from .units import (
def do_commands(
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for all commands."""
common = argparse.ArgumentParser(add_help=False)

@ -38,9 +38,9 @@ from .xml import (
def do_coverage(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for all `coverage` commands."""
coverage_common = argparse.ArgumentParser(add_help=False, parents=[parent])
@ -61,7 +61,7 @@ def do_coverage(
def add_coverage_common(
parser, # type: argparse.ArgumentParser
parser: argparse.ArgumentParser,
):
"""Add common coverage arguments."""
parser.add_argument(

@ -14,9 +14,9 @@ from ....environments import (
def do_analyze(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for all `coverage analyze` commands."""
parser = subparsers.add_parser(
'analyze',

@ -30,9 +30,9 @@ from .missing import (
def do_targets(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for all `coverage analyze targets` commands."""
targets = subparsers.add_parser(
'targets',

@ -18,8 +18,8 @@ from .....environments import (
def do_combine(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `coverage analyze targets combine` command."""
parser = subparsers.add_parser(

@ -18,8 +18,8 @@ from .....environments import (
def do_expand(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `coverage analyze targets expand` command."""
parser = subparsers.add_parser(

@ -18,8 +18,8 @@ from .....environments import (
def do_filter(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `coverage analyze targets filter` command."""
parser = subparsers.add_parser(

@ -18,8 +18,8 @@ from .....environments import (
def do_generate(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `coverage analyze targets generate` command."""
parser = subparsers.add_parser(

@ -18,8 +18,8 @@ from .....environments import (
def do_missing(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `coverage analyze targets missing` command."""
parser = subparsers.add_parser(

@ -19,10 +19,10 @@ from ...environments import (
def do_combine(
subparsers,
parent, # type: argparse.ArgumentParser
add_coverage_common, # type: t.Callable[[argparse.ArgumentParser], None]
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
add_coverage_common: t.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for the `coverage combine` command."""
parser = subparsers.add_parser(
'combine',

@ -18,9 +18,9 @@ from ...environments import (
def do_erase(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for the `coverage erase` command."""
parser = subparsers.add_parser(
'erase',

@ -19,10 +19,10 @@ from ...environments import (
def do_html(
subparsers,
parent, # type: argparse.ArgumentParser
add_coverage_common, # type: t.Callable[[argparse.ArgumentParser], None]
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
add_coverage_common: t.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for the `coverage html` command."""
parser = subparsers.add_parser(
'html',

@ -19,10 +19,10 @@ from ...environments import (
def do_report(
subparsers,
parent, # type: argparse.ArgumentParser
add_coverage_common, # type: t.Callable[[argparse.ArgumentParser], None]
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
add_coverage_common: t.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for the `coverage report` command."""
parser = subparsers.add_parser(
'report',

@ -19,10 +19,10 @@ from ...environments import (
def do_xml(
subparsers,
parent, # type: argparse.ArgumentParser
add_coverage_common, # type: t.Callable[[argparse.ArgumentParser], None]
completer, # type: CompositeActionCompletionFinder
): # type: (...) -> None
parent: argparse.ArgumentParser,
add_coverage_common: t.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder,
) -> None:
"""Command line parsing for the `coverage xml` command."""
parser = subparsers.add_parser(
'xml',

@ -18,8 +18,8 @@ from ..environments import (
def do_env(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `env` command."""
parser = subparsers.add_parser(

@ -27,8 +27,8 @@ from .windows import (
def do_integration(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for all integration commands."""
parser = argparse.ArgumentParser(
@ -42,7 +42,7 @@ def do_integration(
def add_integration_common(
parser, # type: argparse.ArgumentParser
parser: argparse.ArgumentParser,
):
"""Add common integration arguments."""
register_completer(parser.add_argument(

@ -35,9 +35,9 @@ from ...completers import (
def do_network_integration(
subparsers,
parent, # type: argparse.ArgumentParser
add_integration_common, # type: t.Callable[[argparse.ArgumentParser], None]
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
add_integration_common: t.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `network-integration` command."""
parser = subparsers.add_parser(

@ -26,9 +26,9 @@ from ...environments import (
def do_posix_integration(
subparsers,
parent, # type: argparse.ArgumentParser
add_integration_common, # type: t.Callable[[argparse.ArgumentParser], None]
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
add_integration_common: t.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `integration` command."""
parser = subparsers.add_parser(

@ -26,9 +26,9 @@ from ...environments import (
def do_windows_integration(
subparsers,
parent, # type: argparse.ArgumentParser
add_integration_common, # type: t.Callable[[argparse.ArgumentParser], None]
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
add_integration_common: t.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `windows-integration` command."""
parser = subparsers.add_parser(

@ -30,8 +30,8 @@ from ..environments import (
def do_sanity(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `sanity` command."""
parser = subparsers.add_parser(

@ -21,8 +21,8 @@ from ..environments import (
def do_shell(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `shell` command."""
parser = subparsers.add_parser(

@ -25,8 +25,8 @@ from ..environments import (
def do_units(
subparsers,
parent, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
):
"""Command line parsing for the `units` command."""
parser = subparsers.add_parser(

@ -146,14 +146,14 @@ class LegacyHostOptions:
@staticmethod
def purge_args(args: t.List[str]) -> t.List[str]:
"""Purge legacy host options from the given command line arguments."""
fields = dataclasses.fields(LegacyHostOptions) # type: t.Tuple[dataclasses.Field, ...]
filters = {get_option_name(field.name): 0 if field.type is t.Optional[bool] else 1 for field in fields} # type: t.Dict[str, int]
fields: t.Tuple[dataclasses.Field, ...] = dataclasses.fields(LegacyHostOptions)
filters: t.Dict[str, int] = {get_option_name(field.name): 0 if field.type is t.Optional[bool] else 1 for field in fields}
return filter_args(args, filters)
def get_options_used(self) -> t.Tuple[str, ...]:
"""Return a tuple of the command line options used."""
fields = dataclasses.fields(self) # type: t.Tuple[dataclasses.Field, ...]
fields: t.Tuple[dataclasses.Field, ...] = dataclasses.fields(self)
options = tuple(sorted(get_option_name(field.name) for field in fields if getattr(self, field.name)))
return options
@ -190,10 +190,10 @@ class TargetMode(enum.Enum):
def convert_legacy_args(
argv, # type: t.List[str]
args, # type: t.Union[argparse.Namespace, types.SimpleNamespace]
mode, # type: TargetMode
): # type: (...) -> HostSettings
argv: t.List[str],
args: t.Union[argparse.Namespace, types.SimpleNamespace],
mode: TargetMode,
) -> HostSettings:
"""Convert pre-split host arguments in the given namespace to their split counterparts."""
old_options = LegacyHostOptions.create(args)
old_options.purge_namespace(args)
@ -261,10 +261,10 @@ def convert_legacy_args(
def controller_targets(
mode, # type: TargetMode
options, # type: LegacyHostOptions
controller, # type: ControllerHostConfig
): # type: (...) -> t.List[HostConfig]
mode: TargetMode,
options: LegacyHostOptions,
controller: ControllerHostConfig,
) -> t.List[HostConfig]:
"""Return the configuration for controller targets."""
python = native_python(options)
@ -287,9 +287,9 @@ def native_python(options: LegacyHostOptions) -> t.Optional[NativePythonConfig]:
def get_legacy_host_config(
mode, # type: TargetMode
options, # type: LegacyHostOptions
): # type: (...) -> t.Tuple[ControllerHostConfig, t.List[HostConfig], t.Optional[FallbackDetail]]
mode: TargetMode,
options: LegacyHostOptions,
) -> t.Tuple[ControllerHostConfig, t.List[HostConfig], t.Optional[FallbackDetail]]:
"""
Returns controller and target host configs derived from the provided legacy host options.
The goal is to match the original behavior, by using non-split testing whenever possible.
@ -300,7 +300,7 @@ def get_legacy_host_config(
docker_fallback = 'default'
remote_fallback = get_fallback_remote_controller()
controller_fallback = None # type: t.Optional[t.Tuple[str, str, FallbackReason]]
controller_fallback: t.Optional[t.Tuple[str, str, FallbackReason]] = None
controller: t.Optional[ControllerHostConfig]
targets: t.List[HostConfig]
@ -453,10 +453,10 @@ def get_legacy_host_config(
def handle_non_posix_targets(
mode, # type: TargetMode
options, # type: LegacyHostOptions
targets, # type: t.List[HostConfig]
): # type: (...) -> t.List[HostConfig]
mode: TargetMode,
options: LegacyHostOptions,
targets: t.List[HostConfig],
) -> t.List[HostConfig]:
"""Return a list of non-POSIX targets if the target mode is non-POSIX."""
if mode == TargetMode.WINDOWS_INTEGRATION:
if options.windows:
@ -486,9 +486,9 @@ def handle_non_posix_targets(
def default_targets(
mode, # type: TargetMode
controller, # type: ControllerHostConfig
): # type: (...) -> t.List[HostConfig]
mode: TargetMode,
controller: ControllerHostConfig,
) -> t.List[HostConfig]:
"""Return a list of default targets for the given target mode."""
targets: t.List[HostConfig]

@ -81,11 +81,11 @@ class ControllerMode(enum.Enum):
def add_environments(
parser, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
controller_mode, # type: ControllerMode
target_mode, # type: TargetMode
): # type: (...) -> None
parser: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
controller_mode: ControllerMode,
target_mode: TargetMode,
) -> None:
"""Add arguments for the environments used to run ansible-test and commands it invokes."""
no_environment = controller_mode == ControllerMode.NO_DELEGATION and target_mode == TargetMode.NO_TARGETS
@ -114,8 +114,8 @@ def add_environments(
def add_global_options(
parser, # type: argparse.ArgumentParser
controller_mode, # type: ControllerMode
parser: argparse.ArgumentParser,
controller_mode: ControllerMode,
):
"""Add global options for controlling the test environment that work with both the legacy and composite options."""
global_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='global environment arguments'))
@ -156,11 +156,11 @@ def add_global_options(
def add_composite_environment_options(
parser, # type: argparse.ArgumentParser
completer, # type: CompositeActionCompletionFinder
controller_mode, # type: ControllerMode
target_mode, # type: TargetMode
): # type: (...) -> t.List[t.Type[CompositeAction]]
parser: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder,
controller_mode: ControllerMode,
target_mode: TargetMode,
) -> t.List[t.Type[CompositeAction]]:
"""Add composite options for controlling the test environment."""
composite_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(
title='composite environment arguments (mutually exclusive with "environment arguments" above)'))
@ -170,7 +170,7 @@ def add_composite_environment_options(
help=argparse.SUPPRESS,
)
action_types = [] # type: t.List[t.Type[CompositeAction]]
action_types: t.List[t.Type[CompositeAction]] = []
def register_action_type(action_type: t.Type[CompositeAction]) -> t.Type[CompositeAction]:
"""Register the provided composite action type and return it."""
@ -246,9 +246,9 @@ def add_composite_environment_options(
def add_legacy_environment_options(
parser, # type: argparse.ArgumentParser
controller_mode, # type: ControllerMode
target_mode, # type: TargetMode
parser: argparse.ArgumentParser,
controller_mode: ControllerMode,
target_mode: TargetMode,
):
"""Add legacy options for controlling the test environment."""
environment: argparse.ArgumentParser = parser.add_argument_group( # type: ignore[assignment] # real type private
@ -259,9 +259,9 @@ def add_legacy_environment_options(
def add_environments_python(
environments_parser, # type: argparse.ArgumentParser
target_mode, # type: TargetMode
): # type: (...) -> None
environments_parser: argparse.ArgumentParser,
target_mode: TargetMode,
) -> None:
"""Add environment arguments to control the Python version(s) used."""
python_versions: t.Tuple[str, ...]
@ -285,10 +285,10 @@ def add_environments_python(
def add_environments_host(
environments_parser, # type: argparse.ArgumentParser
controller_mode, # type: ControllerMode
environments_parser: argparse.ArgumentParser,
controller_mode: ControllerMode,
target_mode # type: TargetMode
): # type: (...) -> None
) -> None:
"""Add environment arguments for the given host and argument modes."""
environments_exclusive_group: argparse.ArgumentParser = environments_parser.add_mutually_exclusive_group() # type: ignore[assignment] # real type private
@ -307,8 +307,8 @@ def add_environments_host(
def add_environment_network(
environments_parser, # type: argparse.ArgumentParser
): # type: (...) -> None
environments_parser: argparse.ArgumentParser,
) -> None:
"""Add environment arguments for running on a windows host."""
register_completer(environments_parser.add_argument(
'--platform',
@ -341,8 +341,8 @@ def add_environment_network(
def add_environment_windows(
environments_parser, # type: argparse.ArgumentParser
): # type: (...) -> None
environments_parser: argparse.ArgumentParser,
) -> None:
"""Add environment arguments for running on a windows host."""
register_completer(environments_parser.add_argument(
'--windows',
@ -359,8 +359,8 @@ def add_environment_windows(
def add_environment_local(
exclusive_parser, # type: argparse.ArgumentParser
): # type: (...) -> None
exclusive_parser: argparse.ArgumentParser,
) -> None:
"""Add environment arguments for running on the local (origin) host."""
exclusive_parser.add_argument(
'--local',
@ -370,9 +370,9 @@ def add_environment_local(
def add_environment_venv(
exclusive_parser, # type: argparse.ArgumentParser
environments_parser, # type: argparse.ArgumentParser
): # type: (...) -> None
exclusive_parser: argparse.ArgumentParser,
environments_parser: argparse.ArgumentParser,
) -> None:
"""Add environment arguments for running in ansible-test managed virtual environments."""
exclusive_parser.add_argument(
'--venv',
@ -387,9 +387,9 @@ def add_environment_venv(
def add_global_docker(
parser, # type: argparse.ArgumentParser
controller_mode, # type: ControllerMode
): # type: (...) -> None
parser: argparse.ArgumentParser,
controller_mode: ControllerMode,
) -> None:
"""Add global options for Docker."""
if controller_mode != ControllerMode.DELEGATED:
parser.set_defaults(
@ -430,10 +430,10 @@ def add_global_docker(
def add_environment_docker(
exclusive_parser, # type: argparse.ArgumentParser
environments_parser, # type: argparse.ArgumentParser
target_mode, # type: TargetMode
): # type: (...) -> None
exclusive_parser: argparse.ArgumentParser,
environments_parser: argparse.ArgumentParser,
target_mode: TargetMode,
) -> None:
"""Add environment arguments for running in docker containers."""
if target_mode in (TargetMode.POSIX_INTEGRATION, TargetMode.SHELL):
docker_images = sorted(filter_completion(docker_completion()))
@ -470,9 +470,9 @@ def add_environment_docker(
def add_global_remote(
parser, # type: argparse.ArgumentParser
controller_mode, # type: ControllerMode
): # type: (...) -> None
parser: argparse.ArgumentParser,
controller_mode: ControllerMode,
) -> None:
"""Add global options for remote instances."""
if controller_mode != ControllerMode.DELEGATED:
parser.set_defaults(
@ -509,10 +509,10 @@ def add_global_remote(
def add_environment_remote(
exclusive_parser, # type: argparse.ArgumentParser
environments_parser, # type: argparse.ArgumentParser
target_mode, # type: TargetMode
): # type: (...) -> None
exclusive_parser: argparse.ArgumentParser,
environments_parser: argparse.ArgumentParser,
target_mode: TargetMode,
) -> None:
"""Add environment arguments for running in ansible-core-ci provisioned remote virtual machines."""
if target_mode == TargetMode.POSIX_INTEGRATION:
remote_platforms = get_remote_platform_choices()

@ -142,7 +142,7 @@ class WindowsTargetParser(TargetsNamespaceParser, TypeParser):
def get_internal_parsers(self, targets: t.List[WindowsConfig]) -> t.Dict[str, Parser]:
"""Return a dictionary of type names and type parsers."""
parsers = {} # type: t.Dict[str, Parser]
parsers: t.Dict[str, Parser] = {}
if self.allow_inventory and not targets:
parsers.update(
@ -184,7 +184,7 @@ class NetworkTargetParser(TargetsNamespaceParser, TypeParser):
def get_internal_parsers(self, targets: t.List[NetworkConfig]) -> t.Dict[str, Parser]:
"""Return a dictionary of type names and type parsers."""
parsers = {} # type: t.Dict[str, Parser]
parsers: t.Dict[str, Parser] = {}
if self.allow_inventory and not targets:
parsers.update(

@ -117,7 +117,7 @@ class DockerParser(PairParser):
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
value = super().parse(state) # type: DockerConfig
value: DockerConfig = super().parse(state)
if not value.python and not get_docker_pythons(value.name, self.controller, True):
raise ParserError(f'Python version required for docker image: {value.name}')
@ -159,7 +159,7 @@ class PosixRemoteParser(PairParser):
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
value = super().parse(state) # type: PosixRemoteConfig
value: PosixRemoteConfig = super().parse(state)
if not value.python and not get_remote_pythons(value.name, self.controller, True):
raise ParserError(f'Python version required for remote: {value.name}')

@ -60,10 +60,10 @@ class PythonParser(Parser):
The origin host and unknown environments assume all relevant Python versions are available.
"""
def __init__(self,
versions, # type: t.Sequence[str]
versions: t.Sequence[str],
*,
allow_default, # type: bool
allow_venv, # type: bool
allow_default: bool,
allow_venv: bool,
):
version_choices = list(versions)

@ -161,12 +161,12 @@ def get_python_modules() -> t.Dict[str, str]:
def enumerate_python_arcs(
path, # type: str
coverage, # type: coverage_module
modules, # type: t.Dict[str, str]
collection_search_re, # type: t.Optional[t.Pattern]
collection_sub_re, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Generator[t.Tuple[str, t.Set[t.Tuple[int, int]]], None, None]
path: str,
coverage: coverage_module,
modules: t.Dict[str, str],
collection_search_re: t.Optional[t.Pattern],
collection_sub_re: t.Optional[t.Pattern],
) -> t.Generator[t.Tuple[str, t.Set[t.Tuple[int, int]]], None, None]:
"""Enumerate Python code coverage arcs in the given file."""
if os.path.getsize(path) == 0:
display.warning('Empty coverage file: %s' % path, verbosity=2)
@ -238,10 +238,10 @@ def read_python_coverage_legacy(path: str) -> PythonArcs:
def enumerate_powershell_lines(
path, # type: str
collection_search_re, # type: t.Optional[t.Pattern]
collection_sub_re, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Generator[t.Tuple[str, t.Dict[int, int]], None, None]
path: str,
collection_search_re: t.Optional[t.Pattern],
collection_sub_re: t.Optional[t.Pattern],
) -> t.Generator[t.Tuple[str, t.Dict[int, int]], None, None]:
"""Enumerate PowerShell code coverage lines in the given file."""
if os.path.getsize(path) == 0:
display.warning('Empty coverage file: %s' % path, verbosity=2)
@ -277,11 +277,11 @@ def enumerate_powershell_lines(
def sanitize_filename(
filename, # type: str
modules=None, # type: t.Optional[t.Dict[str, str]]
collection_search_re=None, # type: t.Optional[t.Pattern]
collection_sub_re=None, # type: t.Optional[t.Pattern]
): # type: (...) -> t.Optional[str]
filename: str,
modules: t.Optional[t.Dict[str, str]] = None,
collection_search_re: t.Optional[t.Pattern] = None,
collection_sub_re: t.Optional[t.Pattern] = None,
) -> t.Optional[str]:
"""Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid."""
ansible_path = os.path.abspath('lib/ansible/') + '/'
root_path = data_context().content.root + '/'
@ -346,7 +346,7 @@ class PathChecker:
def __init__(self, args: CoverageConfig, collection_search_re: t.Optional[t.Pattern] = None) -> None:
self.args = args
self.collection_search_re = collection_search_re
self.invalid_paths = [] # type: t.List[str]
self.invalid_paths: t.List[str] = []
self.invalid_path_chars = 0
def check_path(self, path: str) -> bool:

@ -33,7 +33,7 @@ class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
def make_report(target_indexes: TargetIndexes, arcs: Arcs, lines: Lines) -> t.Dict[str, t.Any]:
"""Condense target indexes, arcs and lines into a compact report."""
set_indexes = {} # type: TargetSetIndexes
set_indexes: TargetSetIndexes = {}
arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items())
line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items())
@ -50,10 +50,10 @@ def make_report(target_indexes: TargetIndexes, arcs: Arcs, lines: Lines) -> t.Di
def load_report(report): # type: (t.Dict[str, t.Any]) -> t.Tuple[t.List[str], Arcs, Lines]
"""Extract target indexes, arcs and lines from an existing report."""
try:
target_indexes = report['targets'] # type: t.List[str]
target_sets = report['target_sets'] # type: t.List[t.List[int]]
arc_data = report['arcs'] # type: t.Dict[str, t.Dict[str, int]]
line_data = report['lines'] # type: t.Dict[str, t.Dict[int, int]]
target_indexes: t.List[str] = report['targets']
target_sets: t.List[t.List[int]] = report['target_sets']
arc_data: t.Dict[str, t.Dict[str, int]] = report['arcs']
line_data: t.Dict[str, t.Dict[int, int]] = report['lines']
except KeyError as ex:
raise ApplicationError('Document is missing key "%s".' % ex.args)
except TypeError:
@ -117,12 +117,12 @@ def get_target_index(name: str, target_indexes: TargetIndexes) -> int:
def expand_indexes(
source_data, # type: IndexedPoints
source_index, # type: t.List[str]
format_func, # type: t.Callable[[TargetKey], str]
): # type: (...) -> NamedPoints
source_data: IndexedPoints,
source_index: t.List[str],
format_func: t.Callable[[TargetKey], str],
) -> NamedPoints:
"""Expand indexes from the source into target names for easier processing of the data (arcs or lines)."""
combined_data = {} # type: t.Dict[str, t.Dict[t.Any, t.Set[str]]]
combined_data: t.Dict[str, t.Dict[t.Any, t.Set[str]]] = {}
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})
@ -138,7 +138,7 @@ def expand_indexes(
def generate_indexes(target_indexes: TargetIndexes, data: NamedPoints) -> IndexedPoints:
"""Return an indexed version of the given data (arcs or points)."""
results = {} # type: IndexedPoints
results: IndexedPoints = {}
for path, points in data.items():
result_points = results[path] = {}

@ -31,8 +31,8 @@ class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args)
self.input_files = args.input_file # type: t.List[str]
self.output_file = args.output_file # type: str
self.input_files: t.List[str] = args.input_file
self.output_file: str = args.output_file
def command_coverage_analyze_targets_combine(args: CoverageAnalyzeTargetsCombineConfig) -> None:
@ -42,9 +42,9 @@ def command_coverage_analyze_targets_combine(args: CoverageAnalyzeTargetsCombine
if args.delegate:
raise Delegate(host_state=host_state)
combined_target_indexes = {} # type: TargetIndexes
combined_path_arcs = {} # type: Arcs
combined_path_lines = {} # type: Lines
combined_target_indexes: TargetIndexes = {}
combined_path_arcs: Arcs = {}
combined_path_lines: Lines = {}
for report_path in args.input_files:
covered_targets, covered_path_arcs, covered_path_lines = read_report(report_path)
@ -58,11 +58,11 @@ def command_coverage_analyze_targets_combine(args: CoverageAnalyzeTargetsCombine
def merge_indexes(
source_data, # type: IndexedPoints
source_index, # type: t.List[str]
combined_data, # type: IndexedPoints
combined_index, # type: TargetIndexes
): # type: (...) -> None
source_data: IndexedPoints,
source_index: t.List[str],
combined_data: IndexedPoints,
combined_index: TargetIndexes,
) -> None:
"""Merge indexes from the source into the combined data set (arcs or lines)."""
for covered_path, covered_points in source_data.items():
combined_points = combined_data.setdefault(covered_path, {})

@ -29,8 +29,8 @@ class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
self.input_file: str = args.input_file
self.output_file: str = args.output_file
def command_coverage_analyze_targets_expand(args: CoverageAnalyzeTargetsExpandConfig) -> None:

@ -32,12 +32,12 @@ class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args)
self.input_file = args.input_file # type: str
self.output_file = args.output_file # type: str
self.include_targets = args.include_targets # type: t.List[str]
self.exclude_targets = args.exclude_targets # type: t.List[str]
self.include_path = args.include_path # type: t.Optional[str]
self.exclude_path = args.exclude_path # type: t.Optional[str]
self.input_file: str = args.input_file
self.output_file: str = args.output_file
self.include_targets: t.List[str] = args.include_targets
self.exclude_targets: t.List[str] = args.exclude_targets
self.include_path: t.Optional[str] = args.include_path
self.exclude_path: t.Optional[str] = args.exclude_path
def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterConfig) -> None:
@ -81,7 +81,7 @@ def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterCo
filtered_path_arcs = filter_data(filtered_path_arcs, path_filter_func, target_filter_func)
filtered_path_lines = filter_data(filtered_path_lines, path_filter_func, target_filter_func)
target_indexes = {} # type: TargetIndexes
target_indexes: TargetIndexes = {}
indexed_path_arcs = generate_indexes(target_indexes, filtered_path_arcs)
indexed_path_lines = generate_indexes(target_indexes, filtered_path_lines)
@ -91,12 +91,12 @@ def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterCo
def filter_data(
data, # type: NamedPoints
path_filter_func, # type: t.Callable[[str], bool]
target_filter_func, # type: t.Callable[[t.Set[str]], t.Set[str]]
): # type: (...) -> NamedPoints
data: NamedPoints,
path_filter_func: t.Callable[[str], bool],
target_filter_func: t.Callable[[t.Set[str]], t.Set[str]],
) -> NamedPoints:
"""Filter the data set using the specified filter function."""
result = {} # type: NamedPoints
result: NamedPoints = {}
for src_path, src_points in data.items():
if not path_filter_func(src_path):

@ -55,8 +55,8 @@ class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args)
self.input_dir = args.input_dir or ResultType.COVERAGE.path # type: str
self.output_file = args.output_file # type: str
self.input_dir: str = args.input_dir or ResultType.COVERAGE.path
self.output_file: str = args.output_file
def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenerateConfig) -> None:
@ -67,7 +67,7 @@ def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenera
raise Delegate(host_state)
root = data_context().content.root
target_indexes = {} # type: TargetIndexes
target_indexes: TargetIndexes = {}
arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, host_state, args.input_dir, target_indexes).items())
lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items())
report = make_report(target_indexes, arcs, lines)
@ -75,13 +75,13 @@ def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenera
def analyze_python_coverage(
args, # type: CoverageAnalyzeTargetsGenerateConfig
host_state, # type: HostState
path, # type: str
target_indexes, # type: TargetIndexes
): # type: (...) -> Arcs
args: CoverageAnalyzeTargetsGenerateConfig,
host_state: HostState,
path: str,
target_indexes: TargetIndexes,
) -> Arcs:
"""Analyze Python code coverage."""
results = {} # type: Arcs
results: Arcs = {}
collection_search_re, collection_sub_re = get_collection_path_regexes()
modules = get_python_modules()
python_files = get_python_coverage_files(path)
@ -107,12 +107,12 @@ def analyze_python_coverage(
def analyze_powershell_coverage(
args, # type: CoverageAnalyzeTargetsGenerateConfig
path, # type: str
target_indexes, # type: TargetIndexes
): # type: (...) -> Lines
args: CoverageAnalyzeTargetsGenerateConfig,
path: str,
target_indexes: TargetIndexes,
) -> Lines:
"""Analyze PowerShell code coverage"""
results = {} # type: Lines
results: Lines = {}
collection_search_re, collection_sub_re = get_collection_path_regexes()
powershell_files = get_powershell_coverage_files(path)
@ -136,10 +136,10 @@ def analyze_powershell_coverage(
def prune_invalid_filenames(
args, # type: CoverageAnalyzeTargetsGenerateConfig
results, # type: t.Dict[str, t.Any]
collection_search_re=None, # type: t.Optional[t.Pattern]
): # type: (...) -> None
args: CoverageAnalyzeTargetsGenerateConfig,
results: t.Dict[str, t.Any],
collection_search_re: t.Optional[t.Pattern] = None,
) -> None:
"""Remove invalid filenames from the given result set."""
path_checker = PathChecker(args, collection_search_re)

@ -35,12 +35,12 @@ class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args)
self.from_file = args.from_file # type: str
self.to_file = args.to_file # type: str
self.output_file = args.output_file # type: str
self.from_file: str = args.from_file
self.to_file: str = args.to_file
self.output_file: str = args.output_file
self.only_gaps = args.only_gaps # type: bool
self.only_exists = args.only_exists # type: bool
self.only_gaps: bool = args.only_gaps
self.only_exists: bool = args.only_exists
def command_coverage_analyze_targets_missing(args: CoverageAnalyzeTargetsMissingConfig) -> None:
@ -52,7 +52,7 @@ def command_coverage_analyze_targets_missing(args: CoverageAnalyzeTargetsMissing
from_targets, from_path_arcs, from_path_lines = read_report(args.from_file)
to_targets, to_path_arcs, to_path_lines = read_report(args.to_file)
target_indexes = {} # type: TargetIndexes
target_indexes: TargetIndexes = {}
if args.only_gaps:
arcs = find_gaps(from_path_arcs, from_targets, to_path_arcs, target_indexes, args.only_exists)
@ -66,14 +66,14 @@ def command_coverage_analyze_targets_missing(args: CoverageAnalyzeTargetsMissing
def find_gaps(
from_data, # type: IndexedPoints
from_index, # type: t.List[str]
to_data, # type: IndexedPoints
target_indexes, # type: TargetIndexes
only_exists, # type: bool
): # type: (...) -> IndexedPoints
from_data: IndexedPoints,
from_index: t.List[str],
to_data: IndexedPoints,
target_indexes: TargetIndexes,
only_exists: bool,
) -> IndexedPoints:
"""Find gaps in coverage between the from and to data sets."""
target_data = {} # type: IndexedPoints
target_data: IndexedPoints = {}
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):
@ -91,15 +91,15 @@ def find_gaps(
def find_missing(
from_data, # type: IndexedPoints
from_index, # type: t.List[str]
to_data, # type: IndexedPoints
to_index, # type: t.List[str]
target_indexes, # type: TargetIndexes
only_exists, # type: bool
): # type: (...) -> IndexedPoints
from_data: IndexedPoints,
from_index: t.List[str],
to_data: IndexedPoints,
to_index: t.List[str],
target_indexes: TargetIndexes,
only_exists: bool,
) -> IndexedPoints:
"""Find coverage in from_data not present in to_data (arcs or lines)."""
target_data = {} # type: IndexedPoints
target_data: IndexedPoints = {}
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):

@ -353,9 +353,9 @@ class CoverageCombineConfig(CoverageConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args)
self.group_by = frozenset(args.group_by) if args.group_by else frozenset() # type: t.FrozenSet[str]
self.all = args.all # type: bool
self.stub = args.stub # type: bool
self.group_by: t.FrozenSet[str] = frozenset(args.group_by) if args.group_by else frozenset()
self.all: bool = args.all
self.stub: bool = args.stub
# only available to coverage combine
self.export = args.export if 'export' in args else False # type: str
self.export: str = args.export if 'export' in args else False

@ -147,6 +147,6 @@ class CoverageReportConfig(CoverageCombineConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args)
self.show_missing = args.show_missing # type: bool
self.include = args.include # type: str
self.omit = args.omit # type: str
self.show_missing: bool = args.show_missing
self.include: str = args.include
self.omit: str = args.omit

@ -76,7 +76,7 @@ def _generate_powershell_xml(coverage_file: str) -> Element:
content_root = data_context().content.root
is_ansible = data_context().content.is_ansible
packages = {} # type: t.Dict[str, t.Dict[str, t.Dict[str, int]]]
packages: t.Dict[str, t.Dict[str, t.Dict[str, int]]] = {}
for path, results in coverage_info.items():
filename = os.path.splitext(os.path.basename(path))[0]

@ -134,7 +134,7 @@ def generate_dependency_map(integration_targets: t.List[IntegrationTarget]) -> t
"""Analyze the given list of integration test targets and return a dictionary expressing target names and the targets on which they depend."""
targets_dict = dict((target.name, target) for target in integration_targets)
target_dependencies = analyze_integration_target_dependencies(integration_targets)
dependency_map = {} # type: t.Dict[str, t.Set[IntegrationTarget]]
dependency_map: t.Dict[str, t.Set[IntegrationTarget]] = {}
invalid_targets = set()
@ -159,7 +159,7 @@ def generate_dependency_map(integration_targets: t.List[IntegrationTarget]) -> t
def get_files_needed(target_dependencies: t.List[IntegrationTarget]) -> t.List[str]:
"""Return a list of files needed by the given list of target dependencies."""
files_needed = [] # type: t.List[str]
files_needed: t.List[str] = []
for target_dependency in target_dependencies:
files_needed += target_dependency.needs_file
@ -238,10 +238,10 @@ def delegate_inventory(args: IntegrationConfig, inventory_path_src: str) -> None
@contextlib.contextmanager
def integration_test_environment(
args, # type: IntegrationConfig
target, # type: IntegrationTarget
inventory_path_src, # type: str
): # type: (...) -> t.Iterator[IntegrationEnvironment]
args: IntegrationConfig,
target: IntegrationTarget,
inventory_path_src: str,
) -> t.Iterator[IntegrationEnvironment]:
"""Context manager that prepares the integration test environment and cleans it up."""
ansible_config_src = args.get_ansible_config()
ansible_config_relative = os.path.join(data_context().content.integration_path, '%s.cfg' % args.command)
@ -341,10 +341,10 @@ def integration_test_environment(
@contextlib.contextmanager
def integration_test_config_file(
args, # type: IntegrationConfig
env_config, # type: CloudEnvironmentConfig
integration_dir, # type: str
): # type: (...) -> t.Iterator[t.Optional[str]]
args: IntegrationConfig,
env_config: CloudEnvironmentConfig,
integration_dir: str,
) -> t.Iterator[t.Optional[str]]:
"""Context manager that provides a config file for integration tests, if needed."""
if not env_config:
yield None
@ -370,11 +370,11 @@ def integration_test_config_file(
def create_inventory(
args, # type: IntegrationConfig
host_state, # type: HostState
inventory_path, # type: str
target, # type: IntegrationTarget
): # type: (...) -> None
args: IntegrationConfig,
host_state: HostState,
inventory_path: str,
target: IntegrationTarget,
) -> None:
"""Create inventory."""
if isinstance(args, PosixIntegrationConfig):
if target.target_type == IntegrationTargetType.CONTROLLER:
@ -396,13 +396,13 @@ def create_inventory(
def command_integration_filtered(
args, # type: IntegrationConfig
host_state, # type: HostState
targets, # type: t.Tuple[IntegrationTarget, ...]
all_targets, # type: t.Tuple[IntegrationTarget, ...]
inventory_path, # type: str
pre_target=None, # type: t.Optional[t.Callable[[IntegrationTarget], None]]
post_target=None, # type: t.Optional[t.Callable[[IntegrationTarget], None]]
args: IntegrationConfig,
host_state: HostState,
targets: t.Tuple[IntegrationTarget, ...],
all_targets: t.Tuple[IntegrationTarget, ...],
inventory_path: str,
pre_target: t.Optional[t.Callable[[IntegrationTarget], None]] = None,
post_target: t.Optional[t.Callable[[IntegrationTarget], None]] = None,
):
"""Run integration tests for the specified targets."""
found = False
@ -413,7 +413,7 @@ def command_integration_filtered(
all_targets_dict = dict((target.name, target) for target in all_targets)
setup_errors = []
setup_targets_executed = set() # type: t.Set[str]
setup_targets_executed: t.Set[str] = set()
for target in all_targets:
for setup_target in target.setup_once + target.setup_always:
@ -571,12 +571,12 @@ def command_integration_filtered(
def command_integration_script(
args, # type: IntegrationConfig
host_state, # type: HostState
target, # type: IntegrationTarget
test_dir, # type: str
inventory_path, # type: str
coverage_manager, # type: CoverageManager
args: IntegrationConfig,
host_state: HostState,
target: IntegrationTarget,
test_dir: str,
inventory_path: str,
coverage_manager: CoverageManager,
):
"""Run an integration test script."""
display.info('Running %s integration test script' % target.name)
@ -623,13 +623,13 @@ def command_integration_script(
def command_integration_role(
args, # type: IntegrationConfig
host_state, # type: HostState
target, # type: IntegrationTarget
start_at_task, # type: t.Optional[str]
test_dir, # type: str
inventory_path, # type: str
coverage_manager, # type: CoverageManager
args: IntegrationConfig,
host_state: HostState,
target: IntegrationTarget,
start_at_task: t.Optional[str],
test_dir: str,
inventory_path: str,
coverage_manager: CoverageManager,
):
"""Run an integration test role."""
display.info('Running %s integration test role' % target.name)
@ -742,15 +742,15 @@ def command_integration_role(
def run_setup_targets(
args, # type: IntegrationConfig
host_state, # type: HostState
test_dir, # type: str
target_names, # type: t.Sequence[str]
targets_dict, # type: t.Dict[str, IntegrationTarget]
targets_executed, # type: t.Set[str]
inventory_path, # type: str
coverage_manager, # type: CoverageManager
always, # type: bool
args: IntegrationConfig,
host_state: HostState,
test_dir: str,
target_names: t.Sequence[str],
targets_dict: t.Dict[str, IntegrationTarget],
targets_executed: t.Set[str],
inventory_path: str,
coverage_manager: CoverageManager,
always: bool,
):
"""Run setup targets."""
for target_name in target_names:
@ -773,14 +773,14 @@ def run_setup_targets(
def integration_environment(
args, # type: IntegrationConfig
target, # type: IntegrationTarget
test_dir, # type: str
inventory_path, # type: str
ansible_config, # type: t.Optional[str]
env_config, # type: t.Optional[CloudEnvironmentConfig]
test_env, # type: IntegrationEnvironment
): # type: (...) -> t.Dict[str, str]
args: IntegrationConfig,
target: IntegrationTarget,
test_dir: str,
inventory_path: str,
ansible_config: t.Optional[str],
env_config: t.Optional[CloudEnvironmentConfig],
test_env: IntegrationEnvironment,
) -> t.Dict[str, str]:
"""Return a dictionary of environment variables to use when running the given integration test target."""
env = ansible_environment(args, ansible_config=ansible_config)
@ -881,7 +881,7 @@ If necessary, context can be controlled by adding entries to the "aliases" file
else:
display.warning(f'Unable to determine context for the following test targets, they will be run on the target host: {", ".join(invalid_targets)}')
exclude = set() # type: t.Set[str]
exclude: t.Set[str] = set()
controller_targets = [target for target in targets if target.target_type == IntegrationTargetType.CONTROLLER]
target_targets = [target for target in targets if target.target_type == IntegrationTargetType.TARGET]
@ -896,8 +896,8 @@ If necessary, context can be controlled by adding entries to the "aliases" file
def command_integration_filter(args, # type: TIntegrationConfig
targets, # type: t.Iterable[TIntegrationTarget]
): # type: (...) -> t.Tuple[HostState, t.Tuple[TIntegrationTarget, ...]]
targets: t.Iterable[TIntegrationTarget],
) -> t.Tuple[HostState, t.Tuple[TIntegrationTarget, ...]]:
"""Filter the given integration test targets."""
targets = tuple(target for target in targets if 'hidden/' not in target.aliases)
changes = get_changes_filter(args)

@ -59,8 +59,8 @@ def get_cloud_plugins() -> t.Tuple[t.Dict[str, t.Type[CloudProvider]], t.Dict[st
"""Import cloud plugins and load them into the plugin dictionaries."""
import_plugins('commands/integration/cloud')
providers = {} # type: t.Dict[str, t.Type[CloudProvider]]
environments = {} # type: t.Dict[str, t.Type[CloudEnvironment]]
providers: t.Dict[str, t.Type[CloudProvider]] = {}
environments: t.Dict[str, t.Type[CloudEnvironment]] = {}
load_plugins(CloudProvider, providers)
load_plugins(CloudEnvironment, environments)
@ -134,7 +134,7 @@ def cloud_filter(args, targets): # type: (IntegrationConfig, t.Tuple[Integratio
if args.metadata.cloud_config is not None:
return [] # cloud filter already performed prior to delegation
exclude = [] # type: t.List[str]
exclude: t.List[str] = []
for provider in get_cloud_providers(args, targets):
provider.filter(targets, exclude)
@ -378,10 +378,10 @@ class CloudEnvironment(CloudBase):
class CloudEnvironmentConfig:
"""Configuration for the environment."""
def __init__(self,
env_vars=None, # type: t.Optional[t.Dict[str, str]]
ansible_vars=None, # type: t.Optional[t.Dict[str, t.Any]]
module_defaults=None, # type: t.Optional[t.Dict[str, t.Dict[str, t.Any]]]
callback_plugins=None, # type: t.Optional[t.List[str]]
env_vars: t.Optional[t.Dict[str, str]] = None,
ansible_vars: t.Optional[t.Dict[str, t.Any]] = None,
module_defaults: t.Optional[t.Dict[str, t.Dict[str, t.Any]]] = None,
callback_plugins: t.Optional[t.List[str]] = None,
):
self.env_vars = env_vars
self.ansible_vars = ansible_vars

@ -34,7 +34,7 @@ class AzureCloudProvider(CloudProvider):
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
self.aci = None # type: t.Optional[AnsibleCoreCI]
self.aci: t.Optional[AnsibleCoreCI] = None
self.uses_config = True

@ -68,12 +68,12 @@ class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta):
def skip(
self,
skip, # type: str
reason, # type: str
targets, # type: t.List[IntegrationTarget]
exclude, # type: t.Set[str]
override=None, # type: t.Optional[t.List[str]]
): # type: (...) -> None
skip: str,
reason: str,
targets: t.List[IntegrationTarget],
exclude: t.Set[str],
override: t.Optional[t.List[str]] = None,
) -> None:
"""Apply the specified skip rule to the given targets by updating the provided exclude list."""
if skip.startswith('skip/'):
skipped = [target.name for target in targets if skip in target.skips and (not override or target.name not in override)]
@ -174,7 +174,7 @@ class RemoteTargetFilter(TargetFilter[TRemoteConfig]):
skipped_profiles = [profile for profile in profiles if any(skip in target.skips for skip in get_remote_skip_aliases(profile.config))]
if skipped_profiles:
configs = [profile.config for profile in skipped_profiles] # type: t.List[TRemoteConfig]
configs: t.List[TRemoteConfig] = [profile.config for profile in skipped_profiles]
display.warning(f'Excluding skipped hosts from inventory: {", ".join(config.name for config in configs)}')
profiles = [profile for profile in profiles if profile not in skipped_profiles]

@ -128,7 +128,7 @@ DOCUMENTABLE_PLUGINS = (
'become', 'cache', 'callback', 'cliconf', 'connection', 'httpapi', 'inventory', 'lookup', 'netconf', 'modules', 'shell', 'strategy', 'vars'
)
created_venvs = [] # type: t.List[str]
created_venvs: t.List[str] = []
def command_sanity(args: SanityConfig) -> None:
@ -136,7 +136,7 @@ def command_sanity(args: SanityConfig) -> None:
create_result_directories(args)
target_configs = t.cast(t.List[PosixConfig], args.targets)
target_versions = {target.python.version: target for target in target_configs} # type: t.Dict[str, PosixConfig]
target_versions: t.Dict[str, PosixConfig] = {target.python.version: target for target in target_configs}
handle_layout_messages(data_context().content.sanity_messages)
@ -172,7 +172,7 @@ def command_sanity(args: SanityConfig) -> None:
if disabled:
display.warning('Skipping tests disabled by default without --allow-disabled: %s' % ', '.join(sorted(disabled)))
target_profiles = {profile.config.python.version: profile for profile in host_state.targets(PosixProfile)} # type: t.Dict[str, PosixProfile]
target_profiles: t.Dict[str, PosixProfile] = {profile.config.python.version: profile for profile in host_state.targets(PosixProfile)}
total = 0
failed = []
@ -339,19 +339,19 @@ class SanityIgnoreParser:
self.args = args
self.relative_path = os.path.join(data_context().content.sanity_path, file_name)
self.path = os.path.join(data_context().content.root, self.relative_path)
self.ignores = collections.defaultdict(lambda: collections.defaultdict(dict)) # type: t.Dict[str, t.Dict[str, t.Dict[str, int]]]
self.skips = collections.defaultdict(lambda: collections.defaultdict(int)) # type: t.Dict[str, t.Dict[str, int]]
self.parse_errors = [] # type: t.List[t.Tuple[int, int, str]]
self.file_not_found_errors = [] # type: t.List[t.Tuple[int, str]]
self.ignores: t.Dict[str, t.Dict[str, t.Dict[str, int]]] = collections.defaultdict(lambda: collections.defaultdict(dict))
self.skips: t.Dict[str, t.Dict[str, int]] = collections.defaultdict(lambda: collections.defaultdict(int))
self.parse_errors: t.List[t.Tuple[int, int, str]] = []
self.file_not_found_errors: t.List[t.Tuple[int, str]] = []
lines = read_lines_without_comments(self.path, optional=True)
targets = SanityTargets.get_targets()
paths = set(target.path for target in targets)
tests_by_name = {} # type: t.Dict[str, SanityTest]
versioned_test_names = set() # type: t.Set[str]
unversioned_test_names = {} # type: t.Dict[str, str]
tests_by_name: t.Dict[str, SanityTest] = {}
versioned_test_names: t.Set[str] = set()
unversioned_test_names: t.Dict[str, str] = {}
directories = paths_to_dirs(list(paths))
paths_by_test = {} # type: t.Dict[str, t.Set[str]]
paths_by_test: t.Dict[str, t.Set[str]] = {}
display.info('Read %d sanity test ignore line(s) for %s from: %s' % (len(lines), ansible_label, self.relative_path), verbosity=1)
@ -526,10 +526,10 @@ class SanityIgnoreParser:
class SanityIgnoreProcessor:
"""Processor for sanity test ignores for a single run of one sanity test."""
def __init__(self,
args, # type: SanityConfig
test, # type: SanityTest
python_version, # type: t.Optional[str]
): # type: (...) -> None
args: SanityConfig,
test: SanityTest,
python_version: t.Optional[str],
) -> None:
name = test.name
code = test.error_code
@ -544,7 +544,7 @@ class SanityIgnoreProcessor:
self.parser = SanityIgnoreParser.load(args)
self.ignore_entries = self.parser.ignores.get(full_name, {})
self.skip_entries = self.parser.skips.get(full_name, {})
self.used_line_numbers = set() # type: t.Set[int]
self.used_line_numbers: t.Set[int] = set()
def filter_skipped_targets(self, targets: t.List[TestTarget]) -> t.List[TestTarget]:
"""Return the given targets, with any skipped paths filtered out."""
@ -583,11 +583,11 @@ class SanityIgnoreProcessor:
def get_errors(self, paths: t.List[str]) -> t.List[SanityMessage]:
"""Return error messages related to issues with the file."""
messages = [] # type: t.List[SanityMessage]
messages: t.List[SanityMessage] = []
# unused errors
unused = [] # type: t.List[t.Tuple[int, str, str]]
unused: t.List[t.Tuple[int, str, str]] = []
if self.test.no_targets or self.test.all_targets:
# tests which do not accept a target list, or which use all targets, always return all possible errors, so all ignores can be checked
@ -631,11 +631,11 @@ class SanityFailure(TestFailure):
"""Sanity test failure."""
def __init__(
self,
test, # type: str
python_version=None, # type: t.Optional[str]
messages=None, # type: t.Optional[t.Sequence[SanityMessage]]
summary=None, # type: t.Optional[str]
): # type: (...) -> None
test: str,
python_version: t.Optional[str] = None,
messages: t.Optional[t.Sequence[SanityMessage]] = None,
summary: t.Optional[str] = None,
) -> None:
super().__init__(COMMAND, test, python_version, messages, summary)
@ -709,7 +709,7 @@ class SanityTest(metaclass=abc.ABCMeta):
# Because these errors can be unpredictable they behave differently than normal error codes:
# * They are not reported by default. The `--enable-optional-errors` option must be used to display these errors.
# * They cannot be ignored. This is done to maintain the integrity of the ignore system.
self.optional_error_codes = set() # type: t.Set[str]
self.optional_error_codes: t.Set[str] = set()
@property
def error_code(self) -> t.Optional[str]:
@ -842,29 +842,29 @@ class SanityCodeSmellTest(SanitySingleVersion):
if self.config:
self.enabled = not self.config.get('disabled')
self.output = self.config.get('output') # type: t.Optional[str]
self.extensions = self.config.get('extensions') # type: t.List[str]
self.prefixes = self.config.get('prefixes') # type: t.List[str]
self.files = self.config.get('files') # type: t.List[str]
self.text = self.config.get('text') # type: t.Optional[bool]
self.ignore_self = self.config.get('ignore_self') # type: bool
self.minimum_python_version = self.config.get('minimum_python_version') # type: t.Optional[str]
self.maximum_python_version = self.config.get('maximum_python_version') # type: t.Optional[str]
self.__all_targets = self.config.get('all_targets') # type: bool
self.__no_targets = self.config.get('no_targets') # type: bool
self.__include_directories = self.config.get('include_directories') # type: bool
self.__include_symlinks = self.config.get('include_symlinks') # type: bool
self.output: t.Optional[str] = self.config.get('output')
self.extensions: t.List[str] = self.config.get('extensions')
self.prefixes: t.List[str] = self.config.get('prefixes')
self.files: t.List[str] = self.config.get('files')
self.text: t.Optional[bool] = self.config.get('text')
self.ignore_self: bool = self.config.get('ignore_self')
self.minimum_python_version: t.Optional[str] = self.config.get('minimum_python_version')
self.maximum_python_version: t.Optional[str] = self.config.get('maximum_python_version')
self.__all_targets: bool = self.config.get('all_targets')
self.__no_targets: bool = self.config.get('no_targets')
self.__include_directories: bool = self.config.get('include_directories')
self.__include_symlinks: bool = self.config.get('include_symlinks')
self.__py2_compat = self.config.get('py2_compat', False) # type: bool
else:
self.output = None
self.extensions = []
self.prefixes = []
self.files = []
self.text = None # type: t.Optional[bool]
self.text: t.Optional[bool] = None
self.ignore_self = False
self.minimum_python_version = None # type: t.Optional[str]
self.maximum_python_version = None # type: t.Optional[str]
self.minimum_python_version: t.Optional[str] = None
self.maximum_python_version: t.Optional[str] = None
self.__all_targets = False
self.__no_targets = True
@ -1087,7 +1087,7 @@ class SanityMultipleVersion(SanityTest, metaclass=abc.ABCMeta):
def sanity_get_tests() -> t.Tuple[SanityTest, ...]:
"""Return a tuple of the available sanity tests."""
import_plugins('commands/sanity')
sanity_plugins = {} # type: t.Dict[str, t.Type[SanityTest]]
sanity_plugins: t.Dict[str, t.Type[SanityTest]] = {}
load_plugins(SanityTest, sanity_plugins)
sanity_plugins.pop('sanity') # SanityCodeSmellTest
sanity_tests = tuple(plugin() for plugin in sanity_plugins.values() if data_context().content.is_ansible or not plugin.ansible_only)
@ -1096,12 +1096,12 @@ def sanity_get_tests() -> t.Tuple[SanityTest, ...]:
def create_sanity_virtualenv(
args, # type: SanityConfig
python, # type: PythonConfig
name, # type: str
coverage=False, # type: bool
minimize=False, # type: bool
): # type: (...) -> t.Optional[VirtualPythonConfig]
args: SanityConfig,
python: PythonConfig,
name: str,
coverage: bool = False,
minimize: bool = False,
) -> t.Optional[VirtualPythonConfig]:
"""Return an existing sanity virtual environment matching the requested parameters or create a new one."""
commands = collect_requirements( # create_sanity_virtualenv()
python=python,

@ -64,8 +64,8 @@ class AnsibleDocTest(SanitySingleVersion):
paths = [target.path for target in targets.include]
doc_targets = collections.defaultdict(list) # type: t.Dict[str, t.List[str]]
target_paths = collections.defaultdict(dict) # type: t.Dict[str, t.Dict[str, str]]
doc_targets: t.Dict[str, t.List[str]] = collections.defaultdict(list)
target_paths: t.Dict[str, t.Dict[str, str]] = collections.defaultdict(dict)
remap_types = dict(
modules='module',
@ -84,7 +84,7 @@ class AnsibleDocTest(SanitySingleVersion):
target_paths[plugin_type][data_context().content.prefix + plugin_name] = plugin_file_path
env = ansible_environment(args, color=False)
error_messages = [] # type: t.List[SanityMessage]
error_messages: t.List[SanityMessage] = []
for doc_type in sorted(doc_targets):
for format_option in [None, '--json']:

@ -56,7 +56,7 @@ class BinSymlinksTest(SanityVersionNeutral):
bin_names = os.listdir(bin_root)
bin_paths = sorted(os.path.join(bin_root, path) for path in bin_names)
errors = [] # type: t.List[t.Tuple[str, str]]
errors: t.List[t.Tuple[str, str]] = []
symlink_map_path = os.path.relpath(symlink_map_full_path, data_context().content.root)

@ -39,7 +39,7 @@ class IgnoresTest(SanityVersionNeutral):
def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult:
sanity_ignore = SanityIgnoreParser.load(args)
messages = [] # type: t.List[SanityMessage]
messages: t.List[SanityMessage] = []
# parse errors

@ -106,8 +106,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
def __init__(self):
super().__init__()
self._ci_config = {} # type: t.Dict[str, t.Any]
self._ci_test_groups = {} # type: t.Dict[str, t.List[int]]
self._ci_config: t.Dict[str, t.Any] = {}
self._ci_test_groups: t.Dict[str, t.List[int]] = {}
@property
def can_ignore(self) -> bool:
@ -130,7 +130,7 @@ class IntegrationAliasesTest(SanitySingleVersion):
def ci_test_groups(self) -> t.Dict[str, t.List[int]]:
"""Return a dictionary of CI test names and their group(s)."""
if not self._ci_test_groups:
test_groups = {} # type: t.Dict[str, t.Set[int]]
test_groups: t.Dict[str, t.Set[int]] = {}
for stage in self._ci_config['stages']:
for job in stage['jobs']:
@ -321,10 +321,10 @@ class IntegrationAliasesTest(SanitySingleVersion):
def check_ci_group(
self,
targets, # type: t.Tuple[CompletionTarget, ...]
find, # type: str
find_incidental=None, # type: t.Optional[t.List[str]]
): # type: (...) -> t.List[SanityMessage]
targets: t.Tuple[CompletionTarget, ...],
find: str,
find_incidental: t.Optional[t.List[str]] = None,
) -> t.List[SanityMessage]:
"""Check the CI groups set in the provided targets and return a list of messages with any issues found."""
all_paths = set(target.path for target in targets)
supported_paths = set(target.path for target in filter_targets(targets, [find], directories=False, errors=False))

@ -111,7 +111,7 @@ class MypyTest(SanityMultipleVersion):
MyPyContext('modules', ['lib/ansible/modules/', 'lib/ansible/module_utils/'], remote_only_python_versions),
)
unfiltered_messages = [] # type: t.List[SanityMessage]
unfiltered_messages: t.List[SanityMessage] = []
for context in contexts:
if python.version not in context.python_versions:
@ -170,12 +170,12 @@ class MypyTest(SanityMultipleVersion):
@staticmethod
def test_context(
args, # type: SanityConfig
virtualenv_python, # type: VirtualPythonConfig
python, # type: PythonConfig
context, # type: MyPyContext
paths, # type: t.List[str]
): # type: (...) -> t.List[SanityMessage]
args: SanityConfig,
virtualenv_python: VirtualPythonConfig,
python: PythonConfig,
context: MyPyContext,
paths: t.List[str],
) -> t.List[SanityMessage]:
"""Run mypy tests for the specified context."""
context_paths = [path for path in paths if any(is_subdir(path, match_path) for match_path in context.paths)]

@ -198,14 +198,14 @@ class PylintTest(SanitySingleVersion):
@staticmethod
def pylint(
args, # type: SanityConfig
context, # type: str
paths, # type: t.List[str]
plugin_dir, # type: str
plugin_names, # type: t.List[str]
python, # type: PythonConfig
collection_detail, # type: CollectionDetail
): # type: (...) -> t.List[t.Dict[str, str]]
args: SanityConfig,
context: str,
paths: t.List[str],
plugin_dir: str,
plugin_names: t.List[str],
python: PythonConfig,
collection_detail: CollectionDetail,
) -> t.List[t.Dict[str, str]]:
"""Run pylint using the config specified by the context on the specified paths."""
rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', context.split('/')[0] + '.cfg')

@ -85,7 +85,7 @@ class ShellcheckTest(SanityVersionNeutral):
return SanitySuccess(self.name)
# json output is missing file paths in older versions of shellcheck, so we'll use xml instead
root = fromstring(stdout) # type: Element
root: Element = fromstring(stdout)
results = []

@ -66,7 +66,7 @@ def command_shell(args: ShellConfig) -> None:
if isinstance(target_profile, ControllerProfile):
# run the shell locally unless a target was requested
con = LocalConnection(args) # type: Connection
con: Connection = LocalConnection(args)
if args.export:
display.info('Configuring controller inventory.', verbosity=1)
@ -90,7 +90,7 @@ def command_shell(args: ShellConfig) -> None:
return
if isinstance(con, SshConnection) and args.raw:
cmd = [] # type: t.List[str]
cmd: t.List[str] = []
elif isinstance(target_profile, PosixProfile):
cmd = []

@ -129,7 +129,7 @@ def command_units(args: UnitsConfig) -> None:
raise AllTargetsSkipped()
targets = t.cast(t.List[PosixConfig], args.targets)
target_versions = {target.python.version: target for target in targets} # type: t.Dict[str, PosixConfig]
target_versions: t.Dict[str, PosixConfig] = {target.python.version: target for target in targets}
skipped_versions = args.host_settings.skipped_python_versions
warn_versions = []
@ -221,7 +221,7 @@ def command_units(args: UnitsConfig) -> None:
display.warning("Skipping unit tests on Python %s because it could not be found." % version)
continue
target_profiles = {profile.config.python.version: profile for profile in host_state.targets(PosixProfile)} # type: t.Dict[str, PosixProfile]
target_profiles: t.Dict[str, PosixProfile] = {profile.config.python.version: profile for profile in host_state.targets(PosixProfile)}
target_profile = target_profiles[version]
final_candidates = [(test_context, target_profile.python, paths, env) for test_context, paths, env in test_candidates]

@ -9,8 +9,8 @@ try:
version,
)
SpecifierSet = specifiers.SpecifierSet # type: t.Optional[t.Type[specifiers.SpecifierSet]]
Version = version.Version # type: t.Optional[t.Type[version.Version]]
SpecifierSet: t.Optional[t.Type[specifiers.SpecifierSet]] = specifiers.SpecifierSet
Version: t.Optional[t.Type[version.Version]] = version.Version
PACKAGING_IMPORT_ERROR = None
except ImportError as ex:
SpecifierSet = None # pylint: disable=invalid-name

@ -15,7 +15,7 @@ except ImportError as ex:
YAML_IMPORT_ERROR = ex
else:
try:
_SafeLoader = _yaml.CSafeLoader # type: t.Union[t.Type[_yaml.CSafeLoader], t.Type[_yaml.SafeLoader]]
_SafeLoader: t.Union[t.Type[_yaml.CSafeLoader], t.Type[_yaml.SafeLoader]] = _yaml.CSafeLoader
except AttributeError:
_SafeLoader = _yaml.SafeLoader

@ -227,10 +227,10 @@ def parse_completion_entry(value: str) -> t.Tuple[str, t.Dict[str, str]]:
def filter_completion(
completion, # type: t.Dict[str, TCompletionConfig]
controller_only=False, # type: bool
include_defaults=False, # type: bool
): # type: (...) -> t.Dict[str, TCompletionConfig]
completion: t.Dict[str, TCompletionConfig],
controller_only: bool = False,
include_defaults: bool = False,
) -> t.Dict[str, TCompletionConfig]:
"""Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified."""
if controller_only:
completion = {name: config for name, config in completion.items() if isinstance(config, PosixCompletionConfig) and config.controller_supported}

@ -70,19 +70,19 @@ class EnvironmentConfig(CommonConfig):
def __init__(self, args: t.Any, command: str) -> None:
super().__init__(args, command)
self.host_settings = args.host_settings # type: HostSettings
self.host_path = args.host_path # type: t.Optional[str]
self.containers = args.containers # type: t.Optional[str]
self.pypi_proxy = args.pypi_proxy # type: bool
self.pypi_endpoint = args.pypi_endpoint # type: t.Optional[str]
self.host_settings: HostSettings = args.host_settings
self.host_path: t.Optional[str] = args.host_path
self.containers: t.Optional[str] = args.containers
self.pypi_proxy: bool = args.pypi_proxy
self.pypi_endpoint: t.Optional[str] = args.pypi_endpoint
# Populated by content_config.get_content_config on the origin.
# Serialized and passed to delegated instances to avoid parsing a second time.
self.content_config = None # type: t.Optional[ContentConfig]
self.content_config: t.Optional[ContentConfig] = None
# Set by check_controller_python once HostState has been created by prepare_profiles.
# This is here for convenience, to avoid needing to pass HostState to some functions which already have access to EnvironmentConfig.
self.controller_python = None # type: t.Optional[PythonConfig]
self.controller_python: t.Optional[PythonConfig] = None
"""
The Python interpreter used by the controller.
Only available after delegation has been performed or skipped (if delegation is not required).
@ -98,18 +98,18 @@ class EnvironmentConfig(CommonConfig):
or bool(verify_sys_executable(self.controller.python.path))
)
self.docker_network = args.docker_network # type: t.Optional[str]
self.docker_terminate = args.docker_terminate # type: t.Optional[TerminateMode]
self.docker_network: t.Optional[str] = args.docker_network
self.docker_terminate: t.Optional[TerminateMode] = args.docker_terminate
self.remote_endpoint = args.remote_endpoint # type: t.Optional[str]
self.remote_stage = args.remote_stage # type: t.Optional[str]
self.remote_terminate = args.remote_terminate # type: t.Optional[TerminateMode]
self.remote_endpoint: t.Optional[str] = args.remote_endpoint
self.remote_stage: t.Optional[str] = args.remote_stage
self.remote_terminate: t.Optional[TerminateMode] = args.remote_terminate
self.prime_containers = args.prime_containers # type: bool
self.prime_containers: bool = args.prime_containers
self.requirements = args.requirements # type: bool
self.requirements: bool = args.requirements
self.delegate_args = [] # type: t.List[str]
self.delegate_args: t.List[str] = []
def host_callback(files): # type: (t.List[t.Tuple[str, str]]) -> None
"""Add the host files to the payload file list."""
@ -196,28 +196,28 @@ class TestConfig(EnvironmentConfig):
def __init__(self, args: t.Any, command: str) -> None:
super().__init__(args, command)
self.coverage = args.coverage # type: bool
self.coverage_check = args.coverage_check # type: bool
self.include = args.include or [] # type: t.List[str]
self.exclude = args.exclude or [] # type: t.List[str]
self.require = args.require or [] # type: t.List[str]
self.changed = args.changed # type: bool
self.tracked = args.tracked # type: bool
self.untracked = args.untracked # type: bool
self.committed = args.committed # type: bool
self.staged = args.staged # type: bool
self.unstaged = args.unstaged # type: bool
self.changed_from = args.changed_from # type: str
self.changed_path = args.changed_path # type: t.List[str]
self.base_branch = args.base_branch # type: str
self.lint = getattr(args, 'lint', False) # type: bool
self.junit = getattr(args, 'junit', False) # type: bool
self.failure_ok = getattr(args, 'failure_ok', False) # type: bool
self.coverage: bool = args.coverage
self.coverage_check: bool = args.coverage_check
self.include: t.List[str] = args.include or []
self.exclude: t.List[str] = args.exclude or []
self.require: t.List[str] = args.require or []
self.changed: bool = args.changed
self.tracked: bool = args.tracked
self.untracked: bool = args.untracked
self.committed: bool = args.committed
self.staged: bool = args.staged
self.unstaged: bool = args.unstaged
self.changed_from: str = args.changed_from
self.changed_path: t.List[str] = args.changed_path
self.base_branch: str = args.base_branch
self.lint: bool = getattr(args, 'lint', False)
self.junit: bool = getattr(args, 'junit', False)
self.failure_ok: bool = getattr(args, 'failure_ok', False)
self.metadata = Metadata.from_file(args.metadata) if args.metadata else Metadata()
self.metadata_path = None # type: t.Optional[str]
self.metadata_path: t.Optional[str] = None
if self.coverage_check:
self.coverage = True
@ -237,11 +237,11 @@ class ShellConfig(EnvironmentConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'shell')
self.cmd = args.cmd # type: t.List[str]
self.raw = args.raw # type: bool
self.cmd: t.List[str] = args.cmd
self.raw: bool = args.raw
self.check_layout = self.delegate # allow shell to be used without a valid layout as long as no delegation is required
self.interactive = sys.stdin.isatty() and not args.cmd # delegation should only be interactive when stdin is a TTY and no command was given
self.export = args.export # type: t.Optional[str]
self.export: t.Optional[str] = args.export
self.display_stderr = True
@ -250,13 +250,13 @@ class SanityConfig(TestConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'sanity')
self.test = args.test # type: t.List[str]
self.skip_test = args.skip_test # type: t.List[str]
self.list_tests = args.list_tests # type: bool
self.allow_disabled = args.allow_disabled # type: bool
self.enable_optional_errors = args.enable_optional_errors # type: bool
self.keep_git = args.keep_git # type: bool
self.prime_venvs = args.prime_venvs # type: bool
self.test: t.List[str] = args.test
self.skip_test: t.List[str] = args.skip_test
self.list_tests: bool = args.list_tests
self.allow_disabled: bool = args.allow_disabled
self.enable_optional_errors: bool = args.enable_optional_errors
self.keep_git: bool = args.keep_git
self.prime_venvs: bool = args.prime_venvs
self.display_stderr = self.lint or self.list_tests
@ -275,25 +275,25 @@ class IntegrationConfig(TestConfig):
def __init__(self, args: t.Any, command: str) -> None:
super().__init__(args, command)
self.start_at = args.start_at # type: str
self.start_at_task = args.start_at_task # type: str
self.allow_destructive = args.allow_destructive # type: bool
self.allow_root = args.allow_root # type: bool
self.allow_disabled = args.allow_disabled # type: bool
self.allow_unstable = args.allow_unstable # type: bool
self.allow_unstable_changed = args.allow_unstable_changed # type: bool
self.allow_unsupported = args.allow_unsupported # type: bool
self.retry_on_error = args.retry_on_error # type: bool
self.continue_on_error = args.continue_on_error # type: bool
self.debug_strategy = args.debug_strategy # type: bool
self.changed_all_target = args.changed_all_target # type: str
self.changed_all_mode = args.changed_all_mode # type: str
self.list_targets = args.list_targets # type: bool
self.start_at: str = args.start_at
self.start_at_task: str = args.start_at_task
self.allow_destructive: bool = args.allow_destructive
self.allow_root: bool = args.allow_root
self.allow_disabled: bool = args.allow_disabled
self.allow_unstable: bool = args.allow_unstable
self.allow_unstable_changed: bool = args.allow_unstable_changed
self.allow_unsupported: bool = args.allow_unsupported
self.retry_on_error: bool = args.retry_on_error
self.continue_on_error: bool = args.continue_on_error
self.debug_strategy: bool = args.debug_strategy
self.changed_all_target: str = args.changed_all_target
self.changed_all_mode: str = args.changed_all_mode
self.list_targets: bool = args.list_targets
self.tags = args.tags
self.skip_tags = args.skip_tags
self.diff = args.diff
self.no_temp_workdir = args.no_temp_workdir # type: bool
self.no_temp_unicode = args.no_temp_unicode # type: bool
self.no_temp_workdir: bool = args.no_temp_workdir
self.no_temp_unicode: bool = args.no_temp_unicode
if self.list_targets:
self.explain = True
@ -331,7 +331,7 @@ class NetworkIntegrationConfig(IntegrationConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'network-integration')
self.testcase = args.testcase # type: str
self.testcase: str = args.testcase
class UnitsConfig(TestConfig):
@ -339,10 +339,10 @@ class UnitsConfig(TestConfig):
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'units')
self.collect_only = args.collect_only # type: bool
self.num_workers = args.num_workers # type: int
self.collect_only: bool = args.collect_only
self.num_workers: int = args.num_workers
self.requirements_mode = getattr(args, 'requirements_mode', '') # type: str
self.requirements_mode: str = getattr(args, 'requirements_mode', '')
if self.requirements_mode == 'only':
self.requirements = True

@ -45,19 +45,19 @@ class Connection(metaclass=abc.ABCMeta):
"""Base class for connecting to a host."""
@abc.abstractmethod
def run(self,
command, # type: t.List[str]
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
output_stream=None, # type: t.Optional[OutputStream]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
command: t.List[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
def extract_archive(self,
chdir, # type: str
src, # type: t.IO[bytes]
chdir: str,
src: t.IO[bytes],
):
"""Extract the given archive file stream in the specified directory."""
tar_cmd = ['tar', 'oxzf', '-', '-C', chdir]
@ -65,10 +65,10 @@ class Connection(metaclass=abc.ABCMeta):
retry(lambda: self.run(tar_cmd, stdin=src, capture=True))
def create_archive(self,
chdir, # type: str
name, # type: str
dst, # type: t.IO[bytes]
exclude=None, # type: t.Optional[str]
chdir: str,
name: str,
dst: t.IO[bytes],
exclude: t.Optional[str] = None,
):
"""Create the specified archive file stream from the specified directory, including the given name and optionally excluding the given name."""
tar_cmd = ['tar', 'cf', '-', '-C', chdir]
@ -93,14 +93,14 @@ class LocalConnection(Connection):
self.args = args
def run(self,
command, # type: t.List[str]
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
output_stream=None, # type: t.Optional[OutputStream]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
command: t.List[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
return run_command(
args=self.args,
@ -135,14 +135,14 @@ class SshConnection(Connection):
self.options.extend(['-o', f'{ssh_option}={ssh_options[ssh_option]}'])
def run(self,
command, # type: t.List[str]
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
output_stream=None, # type: t.Optional[OutputStream]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
command: t.List[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
options = list(self.options)
@ -214,17 +214,17 @@ class DockerConnection(Connection):
def __init__(self, args: EnvironmentConfig, container_id: str, user: t.Optional[str] = None) -> None:
self.args = args
self.container_id = container_id
self.user = user # type: t.Optional[str]
self.user: t.Optional[str] = user
def run(self,
command, # type: t.List[str]
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
output_stream=None, # type: t.Optional[OutputStream]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
command: t.List[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
options = []

@ -81,7 +81,7 @@ from .connections import (
)
# information about support containers provisioned by the current ansible-test instance
support_containers = {} # type: t.Dict[str, ContainerDescriptor]
support_containers: t.Dict[str, ContainerDescriptor] = {}
support_containers_mutex = threading.Lock()
@ -100,20 +100,20 @@ class CleanupMode(enum.Enum):
def run_support_container(
args, # type: EnvironmentConfig
context, # type: str
image, # type: str
name, # type: str
ports, # type: t.List[int]
aliases=None, # type: t.Optional[t.List[str]]
start=True, # type: bool
allow_existing=False, # type: bool
cleanup=None, # type: t.Optional[CleanupMode]
cmd=None, # type: t.Optional[t.List[str]]
env=None, # type: t.Optional[t.Dict[str, str]]
options=None, # type: t.Optional[t.List[str]]
publish_ports=True, # type: bool
): # type: (...) -> t.Optional[ContainerDescriptor]
args: EnvironmentConfig,
context: str,
image: str,
name: str,
ports: t.List[int],
aliases: t.Optional[t.List[str]] = None,
start: bool = True,
allow_existing: bool = False,
cleanup: t.Optional[CleanupMode] = None,
cmd: t.Optional[t.List[str]] = None,
env: t.Optional[t.Dict[str, str]] = None,
options: t.Optional[t.List[str]] = None,
publish_ports: bool = True,
) -> t.Optional[ContainerDescriptor]:
"""
Start a container used to support tests, but not run them.
Containers created this way will be accessible from tests.
@ -344,9 +344,9 @@ def root_ssh(ssh: SshConnection) -> SshConnectionDetail:
def create_container_database(args: EnvironmentConfig) -> ContainerDatabase:
"""Create and return a container database with information necessary for all test hosts to make use of relevant support containers."""
origin = {} # type: t.Dict[str, t.Dict[str, ContainerAccess]]
control = {} # type: t.Dict[str, t.Dict[str, ContainerAccess]]
managed = {} # type: t.Dict[str, t.Dict[str, ContainerAccess]]
origin: t.Dict[str, t.Dict[str, ContainerAccess]] = {}
control: t.Dict[str, t.Dict[str, ContainerAccess]] = {}
managed: t.Dict[str, t.Dict[str, ContainerAccess]] = {}
for name, container in support_containers.items():
if container.details.published_ports:
@ -459,9 +459,9 @@ class SupportContainerContext:
@contextlib.contextmanager
def support_container_context(
args, # type: EnvironmentConfig
ssh, # type: t.Optional[SshConnectionDetail]
): # type: (...) -> t.Iterator[t.Optional[ContainerDatabase]]
args: EnvironmentConfig,
ssh: t.Optional[SshConnectionDetail],
) -> t.Iterator[t.Optional[ContainerDatabase]]:
"""Create a context manager for integration tests that use support containers."""
if not isinstance(args, (IntegrationConfig, UnitsConfig, SanityConfig, ShellConfig)):
yield None # containers are only needed for commands that have targets (hosts or pythons)
@ -482,17 +482,17 @@ def support_container_context(
def create_support_container_context(
args, # type: EnvironmentConfig
ssh, # type: t.Optional[SshConnectionDetail]
containers, # type: ContainerDatabase
): # type: (...) -> SupportContainerContext
args: EnvironmentConfig,
ssh: t.Optional[SshConnectionDetail],
containers: ContainerDatabase,
) -> SupportContainerContext:
"""Context manager that provides SSH port forwards. Returns updated container metadata."""
host_type = HostType.control
revised = ContainerDatabase(containers.data.copy())
source = revised.data.pop(HostType.origin, None)
container_map = {} # type: t.Dict[t.Tuple[str, int], t.Tuple[str, str, int]]
container_map: t.Dict[t.Tuple[str, int], t.Tuple[str, str, int]] = {}
if host_type not in revised.data:
if not source:
@ -518,7 +518,7 @@ def create_support_container_context(
try:
port_forwards = process.collect_port_forwards()
contexts = {} # type: t.Dict[str, t.Dict[str, ContainerAccess]]
contexts: t.Dict[str, t.Dict[str, ContainerAccess]] = {}
for forward, forwarded_port in port_forwards.items():
access_host, access_port = forward
@ -544,18 +544,18 @@ def create_support_container_context(
class ContainerDescriptor:
"""Information about a support container."""
def __init__(self,
image, # type: str
context, # type: str
name, # type: str
container_id, # type: str
ports, # type: t.List[int]
aliases, # type: t.List[str]
publish_ports, # type: bool
running, # type: bool
existing, # type: bool
cleanup, # type: CleanupMode
env, # type: t.Optional[t.Dict[str, str]]
): # type: (...) -> None
image: str,
context: str,
name: str,
container_id: str,
ports: t.List[int],
aliases: t.List[str],
publish_ports: bool,
running: bool,
existing: bool,
cleanup: CleanupMode,
env: t.Optional[t.Dict[str, str]],
) -> None:
self.image = image
self.context = context
self.name = name
@ -567,7 +567,7 @@ class ContainerDescriptor:
self.existing = existing
self.cleanup = cleanup
self.env = env
self.details = None # type: t.Optional[SupportContainer]
self.details: t.Optional[SupportContainer] = None
def start(self, args: EnvironmentConfig) -> None:
"""Start the container. Used for containers which are created, but not started."""
@ -623,22 +623,22 @@ class ContainerDescriptor:
class SupportContainer:
"""Information about a running support container available for use by tests."""
def __init__(self,
container, # type: DockerInspect
container_ip, # type: str
published_ports, # type: t.Dict[int, int]
): # type: (...) -> None
container: DockerInspect,
container_ip: str,
published_ports: t.Dict[int, int],
) -> None:
self.container = container
self.container_ip = container_ip
self.published_ports = published_ports
def wait_for_file(args, # type: EnvironmentConfig
container_name, # type: str
path, # type: str
sleep, # type: int
tries, # type: int
check=None, # type: t.Optional[t.Callable[[str], bool]]
): # type: (...) -> str
container_name: str,
path: str,
sleep: int,
tries: int,
check: t.Optional[t.Callable[[str], bool]] = None,
) -> str:
"""Wait for the specified file to become available in the requested container and return its contents."""
display.info('Waiting for container "%s" to provide file: %s' % (container_name, path))
@ -684,10 +684,10 @@ def create_hosts_entries(context): # type: (t.Dict[str, ContainerAccess]) -> t.
def create_container_hooks(
args, # type: IntegrationConfig
control_connections, # type: t.List[SshConnectionDetail]
managed_connections, # type: t.Optional[t.List[SshConnectionDetail]]
): # type: (...) -> t.Tuple[t.Optional[t.Callable[[IntegrationTarget], None]], t.Optional[t.Callable[[IntegrationTarget], None]]]
args: IntegrationConfig,
control_connections: t.List[SshConnectionDetail],
managed_connections: t.Optional[t.List[SshConnectionDetail]],
) -> t.Tuple[t.Optional[t.Callable[[IntegrationTarget], None]], t.Optional[t.Callable[[IntegrationTarget], None]]]:
"""Return pre and post target callbacks for enabling and disabling container access for each test target."""
containers = get_container_database(args)
@ -706,8 +706,8 @@ def create_container_hooks(
else:
managed_type = 'posix'
control_state = {} # type: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]]
managed_state = {} # type: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]]
control_state: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]] = {}
managed_state: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]] = {}
def pre_target(target):
"""Configure hosts for SSH port forwarding required by the specified target."""
@ -726,7 +726,7 @@ def create_container_hooks(
def create_managed_contexts(control_contexts): # type: (t.Dict[str, t.Dict[str, ContainerAccess]]) -> t.Dict[str, t.Dict[str, ContainerAccess]]
"""Create managed contexts from the given control contexts."""
managed_contexts = {} # type: t.Dict[str, t.Dict[str, ContainerAccess]]
managed_contexts: t.Dict[str, t.Dict[str, ContainerAccess]] = {}
for context_name, control_context in control_contexts.items():
managed_context = managed_contexts[context_name] = {}
@ -738,14 +738,14 @@ def create_managed_contexts(control_contexts): # type: (t.Dict[str, t.Dict[str,
def forward_ssh_ports(
args, # type: IntegrationConfig
ssh_connections, # type: t.Optional[t.List[SshConnectionDetail]]
playbook, # type: str
target_state, # type: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]]
target, # type: IntegrationTarget
host_type, # type: str
contexts, # type: t.Dict[str, t.Dict[str, ContainerAccess]]
): # type: (...) -> None
args: IntegrationConfig,
ssh_connections: t.Optional[t.List[SshConnectionDetail]],
playbook: str,
target_state: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]],
target: IntegrationTarget,
host_type: str,
contexts: t.Dict[str, t.Dict[str, ContainerAccess]],
) -> None:
"""Configure port forwarding using SSH and write hosts file entries."""
if ssh_connections is None:
return
@ -768,7 +768,7 @@ def forward_ssh_ports(
raise Exception('The %s host was not pre-configured for container access and SSH forwarding is not available.' % host_type)
redirects = [] # type: t.List[t.Tuple[int, str, int]]
redirects: t.List[t.Tuple[int, str, int]] = []
messages = []
for container_name, container in test_context.items():
@ -796,7 +796,7 @@ def forward_ssh_ports(
with named_temporary_file(args, 'ssh-inventory-', '.json', None, inventory) as inventory_path: # type: str
run_playbook(args, inventory_path, playbook, capture=False, variables=dict(hosts_entries=hosts_entries))
ssh_processes = [] # type: t.List[SshProcess]
ssh_processes: t.List[SshProcess] = []
if redirects:
for ssh in ssh_connections:
@ -809,13 +809,13 @@ def forward_ssh_ports(
def cleanup_ssh_ports(
args, # type: IntegrationConfig
ssh_connections, # type: t.List[SshConnectionDetail]
playbook, # type: str
target_state, # type: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]]
target, # type: IntegrationTarget
host_type, # type: str
): # type: (...) -> None
args: IntegrationConfig,
ssh_connections: t.List[SshConnectionDetail],
playbook: str,
target_state: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]],
target: IntegrationTarget,
host_type: str,
) -> None:
"""Stop previously configured SSH port forwarding and remove previously written hosts file entries."""
state = target_state.pop(target.name, None)

@ -115,10 +115,10 @@ class AnsibleCoreCI:
def __init__(
self,
args, # type: EnvironmentConfig
resource, # type: Resource
load=True, # type: bool
): # type: (...) -> None
args: EnvironmentConfig,
resource: Resource,
load: bool = True,
) -> None:
self.args = args
self.resource = resource
self.platform, self.version, self.arch, self.provider = self.resource.as_tuple()
@ -162,7 +162,7 @@ class AnsibleCoreCI:
self._clear()
if self.instance_id:
self.started = True # type: bool
self.started: bool = True
else:
self.started = False
self.instance_id = str(uuid.uuid4())
@ -531,13 +531,13 @@ class SshKey:
class InstanceConnection:
"""Container for remote instance status and connection details."""
def __init__(self,
running, # type: bool
hostname=None, # type: t.Optional[str]
port=None, # type: t.Optional[int]
username=None, # type: t.Optional[str]
password=None, # type: t.Optional[str]
response_json=None, # type: t.Optional[t.Dict[str, t.Any]]
): # type: (...) -> None
running: bool,
hostname: t.Optional[str] = None,
port: t.Optional[int] = None,
username: t.Optional[str] = None,
password: t.Optional[str] = None,
response_json: t.Optional[t.Dict[str, t.Any]] = None,
) -> None:
self.running = running
self.hostname = hostname
self.port = port

@ -139,15 +139,15 @@ def get_sqlite_schema_version(path: str) -> int:
def cover_python(
args, # type: TestConfig
python, # type: PythonConfig
cmd, # type: t.List[str]
target_name, # type: str
env, # type: t.Dict[str, str]
capture, # type: bool
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
args: TestConfig,
python: PythonConfig,
cmd: t.List[str],
target_name: str,
env: t.Dict[str, str],
capture: bool,
data: t.Optional[str] = None,
cwd: t.Optional[str] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run a command while collecting Python code coverage."""
if args.coverage:
env.update(get_coverage_environment(args, target_name, python.version))
@ -172,10 +172,10 @@ def get_coverage_platform(config: HostConfig) -> str:
def get_coverage_environment(
args, # type: TestConfig
target_name, # type: str
version, # type: str
): # type: (...) -> t.Dict[str, str]
args: TestConfig,
target_name: str,
version: str,
) -> t.Dict[str, str]:
"""Return environment variables needed to collect code coverage."""
# unit tests, sanity tests and other special cases (localhost only)
# config is in a temporary directory

@ -60,9 +60,9 @@ class DataContext:
self.__layout_providers = layout_providers
self.__source_providers = source_providers
self.__ansible_source = None # type: t.Optional[t.Tuple[t.Tuple[str, str], ...]]
self.__ansible_source: t.Optional[t.Tuple[t.Tuple[str, str], ...]] = None
self.payload_callbacks = [] # type: t.List[t.Callable[[t.List[t.Tuple[str, str]]], None]]
self.payload_callbacks: t.List[t.Callable[[t.List[t.Tuple[str, str]]], None]] = []
if content_path:
content = self.__create_content_layout(layout_providers, source_providers, content_path, False)
@ -71,7 +71,7 @@ class DataContext:
else:
content = self.__create_content_layout(layout_providers, source_providers, current_path, True)
self.content = content # type: ContentLayout
self.content: ContentLayout = content
def create_collection_layouts(self) -> t.List[ContentLayout]:
"""
@ -113,10 +113,10 @@ class DataContext:
@staticmethod
def __create_content_layout(layout_providers, # type: t.List[t.Type[LayoutProvider]]
source_providers, # type: t.List[t.Type[SourceProvider]]
root, # type: str
walk, # type: bool
): # type: (...) -> ContentLayout
source_providers: t.List[t.Type[SourceProvider]],
root: str,
walk: bool,
) -> ContentLayout:
"""Create a content layout using the given providers and root path."""
try:
layout_provider = find_path_provider(LayoutProvider, layout_providers, root, walk)
@ -129,7 +129,7 @@ class DataContext:
# Doing so allows support for older git versions for which it is difficult to distinguish between a super project and a sub project.
# It also provides a better user experience, since the solution for the user would effectively be the same -- to remove the nested version control.
if isinstance(layout_provider, UnsupportedLayout):
source_provider = UnsupportedSource(layout_provider.root) # type: SourceProvider
source_provider: SourceProvider = UnsupportedSource(layout_provider.root)
else:
source_provider = find_path_provider(SourceProvider, source_providers, layout_provider.root, walk)
except ProviderNotFoundForPath:
@ -249,7 +249,7 @@ def content_plugins():
Analyze content.
The primary purpose of this analysis is to facilitate mapping of integration tests to the plugin(s) they are intended to test.
"""
plugins = {} # type: t.Dict[str, t.Dict[str, PluginInfo]]
plugins: t.Dict[str, t.Dict[str, PluginInfo]] = {}
for plugin_type, plugin_directory in data_context().content.plugin_paths.items():
plugin_paths = sorted(data_context().content.walk_files(plugin_directory))

@ -254,13 +254,13 @@ def download_results(args: EnvironmentConfig, con: Connection, content_root: str
def generate_command(
args, # type: EnvironmentConfig
python, # type: PythonConfig
ansible_bin_path, # type: str
content_root, # type: str
exclude, # type: t.List[str]
require, # type: t.List[str]
): # type: (...) -> t.List[str]
args: EnvironmentConfig,
python: PythonConfig,
ansible_bin_path: str,
content_root: str,
exclude: t.List[str],
require: t.List[str],
) -> t.List[str]:
"""Generate the command necessary to delegate ansible-test."""
cmd = [os.path.join(ansible_bin_path, 'ansible-test')]
cmd = [python.path] + cmd
@ -306,11 +306,11 @@ def generate_command(
def filter_options(
args, # type: EnvironmentConfig
argv, # type: t.List[str]
exclude, # type: t.List[str]
require, # type: t.List[str]
): # type: (...) -> t.Iterable[str]
args: EnvironmentConfig,
argv: t.List[str],
exclude: t.List[str],
require: t.List[str],
) -> t.Iterable[str]:
"""Return an iterable that filters out unwanted CLI options and injects new ones as requested."""
replace: list[tuple[str, int, t.Optional[t.Union[bool, str, list[str]]]]] = [
('--docker-no-pull', 0, False),

@ -21,7 +21,7 @@ class FileDiff:
def __init__(self, old_path: str, new_path: str) -> None:
self.old = DiffSide(old_path, new=False)
self.new = DiffSide(new_path, new=True)
self.headers = [] # type: t.List[str]
self.headers: t.List[str] = []
self.binary = False
def append_header(self, line: str) -> None:
@ -43,9 +43,9 @@ class DiffSide:
self.eof_newline = True
self.exists = True
self.lines = [] # type: t.List[t.Tuple[int, str]]
self.lines_and_context = [] # type: t.List[t.Tuple[int, str]]
self.ranges = [] # type: t.List[t.Tuple[int, int]]
self.lines: t.List[t.Tuple[int, str]] = []
self.lines_and_context: t.List[t.Tuple[int, str]] = []
self.ranges: t.List[t.Tuple[int, int]] = []
self._next_line_number = 0
self._lines_remaining = 0
@ -111,13 +111,13 @@ class DiffParser:
"""Parse diff lines."""
def __init__(self, lines: t.List[str]) -> None:
self.lines = lines
self.files = [] # type: t.List[FileDiff]
self.files: t.List[FileDiff] = []
self.action = self.process_start
self.line_number = 0
self.previous_line = None # type: t.Optional[str]
self.line = None # type: t.Optional[str]
self.file = None # type: t.Optional[FileDiff]
self.previous_line: t.Optional[str] = None
self.line: t.Optional[str] = None
self.file: t.Optional[FileDiff] = None
for self.line in self.lines:
self.line_number += 1

@ -285,12 +285,12 @@ def docker_cp_to(args: EnvironmentConfig, container_id: str, src: str, dst: str)
def docker_run(
args, # type: EnvironmentConfig
image, # type: str
options, # type: t.Optional[t.List[str]]
cmd=None, # type: t.Optional[t.List[str]]
create_only=False, # type: bool
): # type: (...) -> str
args: EnvironmentConfig,
image: str,
options: t.Optional[t.List[str]],
cmd: t.Optional[t.List[str]] = None,
create_only: bool = False,
) -> str:
"""Run a container using the given docker image."""
if not options:
options = []
@ -509,17 +509,17 @@ def docker_image_exists(args: EnvironmentConfig, image: str) -> bool:
def docker_exec(
args, # type: EnvironmentConfig
container_id, # type: str
cmd, # type: t.List[str]
capture, # type: bool
options=None, # type: t.Optional[t.List[str]]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
interactive=False, # type: bool
output_stream=None, # type: t.Optional[OutputStream]
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
args: EnvironmentConfig,
container_id: str,
cmd: t.List[str],
capture: bool,
options: t.Optional[t.List[str]] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
interactive: bool = False,
output_stream: t.Optional[OutputStream] = None,
data: t.Optional[str] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Execute the given command in the specified container."""
if not options:
options = []
@ -544,16 +544,16 @@ def docker_version(args: CommonConfig) -> t.Dict[str, t.Any]:
def docker_command(
args, # type: CommonConfig
cmd, # type: t.List[str]
capture, # type: bool
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
interactive=False, # type: bool
output_stream=None, # type: t.Optional[OutputStream]
always=False, # type: bool
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
args: CommonConfig,
cmd: t.List[str],
capture: bool,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
interactive: bool = False,
output_stream: t.Optional[OutputStream] = None,
always: bool = False,
data: t.Optional[str] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified docker command."""
env = docker_environment()
command = [require_docker().command]

@ -159,18 +159,18 @@ class HostProfile(t.Generic[THostConfig], metaclass=abc.ABCMeta):
"""Base class for host profiles."""
def __init__(self,
*,
args, # type: EnvironmentConfig
config, # type: THostConfig
targets, # type: t.Optional[t.List[HostConfig]]
): # type: (...) -> None
args: EnvironmentConfig,
config: THostConfig,
targets: t.Optional[t.List[HostConfig]],
) -> None:
self.args = args
self.config = config
self.controller = bool(targets)
self.targets = targets or []
self.state = {} # type: t.Dict[str, t.Any]
self.state: t.Dict[str, t.Any] = {}
"""State that must be persisted across delegation."""
self.cache = {} # type: t.Dict[str, t.Any]
self.cache: t.Dict[str, t.Any] = {}
"""Cache that must not be persisted across delegation."""
def provision(self) -> None:
@ -572,7 +572,7 @@ class PosixRemoteProfile(ControllerHostProfile[PosixRemoteConfig], RemoteProfile
)
if settings.user == 'root':
become = None # type: t.Optional[Become]
become: t.Optional[Become] = None
elif self.config.become:
become = SUPPORTED_BECOME_METHODS[self.config.become]()
else:
@ -755,10 +755,10 @@ def get_config_profile_type_map() -> t.Dict[t.Type[HostConfig], t.Type[HostProfi
def create_host_profile(
args, # type: EnvironmentConfig
config, # type: HostConfig
controller, # type: bool
): # type: (...) -> HostProfile
args: EnvironmentConfig,
config: HostConfig,
controller: bool,
) -> HostProfile:
"""Create and return a host profile from the given host configuration."""
profile_type = get_config_profile_type_map()[type(config)]
profile = profile_type(args=args, config=config, targets=args.targets if controller else None)

@ -94,7 +94,7 @@ def create_network_inventory(args: EnvironmentConfig, path: str, target_hosts: t
return
target_hosts = t.cast(t.List[NetworkRemoteProfile], target_hosts)
host_groups = {target_host.config.platform: {} for target_host in target_hosts} # type: t.Dict[str, t.Dict[str, t.Dict[str, t.Union[str, int]]]]
host_groups: t.Dict[str, t.Dict[str, t.Dict[str, t.Union[str, int]]]] = {target_host.config.platform: {} for target_host in target_hosts}
for target_host in target_hosts:
host_groups[target_host.config.platform][sanitize_host_name(target_host.config.name)] = target_host.get_inventory_variables()

@ -40,11 +40,11 @@ def make_dirs(path: str) -> None:
def write_json_file(path, # type: str
content, # type: t.Any
create_directories=False, # type: bool
formatted=True, # type: bool
encoder=None, # type: t.Optional[t.Type[json.JSONEncoder]]
): # type: (...) -> str
content: t.Any,
create_directories: bool = False,
formatted: bool = True,
encoder: t.Optional[t.Type[json.JSONEncoder]] = None,
) -> str:
"""Write the given json content to the specified path, optionally creating missing directories."""
text_content = json.dumps(content,
sort_keys=formatted,

@ -21,15 +21,15 @@ class Metadata:
"""Metadata object for passing data to delegated tests."""
def __init__(self):
"""Initialize metadata."""
self.changes = {} # type: t.Dict[str, t.Tuple[t.Tuple[int, int], ...]]
self.cloud_config = None # type: t.Optional[t.Dict[str, t.Dict[str, t.Union[int, str, bool]]]]
self.change_description = None # type: t.Optional[ChangeDescription]
self.ci_provider = None # type: t.Optional[str]
self.changes: t.Dict[str, t.Tuple[t.Tuple[int, int], ...]] = {}
self.cloud_config: t.Optional[t.Dict[str, t.Dict[str, t.Union[int, str, bool]]]] = None
self.change_description: t.Optional[ChangeDescription] = None
self.ci_provider: t.Optional[str] = None
def populate_changes(self, diff: t.Optional[t.List[str]]) -> None:
"""Populate the changeset using the given diff."""
patches = parse_diff(diff)
patches = sorted(patches, key=lambda k: k.new.path) # type: t.List[FileDiff]
patches: t.List[FileDiff] = sorted(patches, key=lambda k: k.new.path)
self.changes = dict((patch.new.path, tuple(patch.new.ranges)) for patch in patches)
@ -83,12 +83,12 @@ class Metadata:
class ChangeDescription:
"""Description of changes."""
def __init__(self):
self.command = '' # type: str
self.changed_paths = [] # type: t.List[str]
self.deleted_paths = [] # type: t.List[str]
self.regular_command_targets = {} # type: t.Dict[str, t.List[str]]
self.focused_command_targets = {} # type: t.Dict[str, t.List[str]]
self.no_integration_paths = [] # type: t.List[str]
self.command: str = ''
self.changed_paths: t.List[str] = []
self.deleted_paths: t.List[str] = []
self.regular_command_targets: t.Dict[str, t.List[str]] = {}
self.focused_command_targets: t.Dict[str, t.List[str]] = {}
self.no_integration_paths: t.List[str] = []
@property
def targets(self) -> t.Optional[t.List[str]]:

@ -69,8 +69,8 @@ def create_payload(args: CommonConfig, dst_path: str) -> None:
collection_layouts = data_context().create_collection_layouts()
content_files = [] # type: t.List[t.Tuple[str, str]]
extra_files = [] # type: t.List[t.Tuple[str, str]]
content_files: t.List[t.Tuple[str, str]] = []
extra_files: t.List[t.Tuple[str, str]] = []
for layout in collection_layouts:
if layout == data_context().content:

@ -17,10 +17,10 @@ def get_path_provider_classes(provider_type: t.Type[TPathProvider]) -> t.List[t.
def find_path_provider(provider_type, # type: t.Type[TPathProvider]
provider_classes, # type: t.List[t.Type[TPathProvider]]
path, # type: str
walk, # type: bool
): # type: (...) -> TPathProvider
provider_classes: t.List[t.Type[TPathProvider]],
path: str,
walk: bool,
) -> TPathProvider:
"""Return the first found path provider of the given type for the given path."""
sequences = sorted(set(pc.sequence for pc in provider_classes if pc.sequence > 0))

@ -18,9 +18,9 @@ from .. import (
class Layout:
"""Description of content locations and helper methods to access content."""
def __init__(self,
root, # type: str
paths, # type: t.List[str]
): # type: (...) -> None
root: str,
paths: t.List[str],
) -> None:
self.root = root
self.__paths = paths # contains both file paths and symlinked directory paths (ending with os.path.sep)
@ -75,24 +75,24 @@ class Layout:
class ContentLayout(Layout):
"""Information about the current Ansible content being tested."""
def __init__(self,
root, # type: str
paths, # type: t.List[str]
plugin_paths, # type: t.Dict[str, str]
collection, # type: t.Optional[CollectionDetail]
test_path, # type: str
results_path, # type: str
sanity_path, # type: str
sanity_messages, # type: t.Optional[LayoutMessages]
integration_path, # type: str
integration_targets_path, # type: str
integration_vars_path, # type: str
integration_messages, # type: t.Optional[LayoutMessages]
unit_path, # type: str
unit_module_path, # type: str
unit_module_utils_path, # type: str
unit_messages, # type: t.Optional[LayoutMessages]
unsupported=False, # type: bool
): # type: (...) -> None
root: str,
paths: t.List[str],
plugin_paths: t.Dict[str, str],
collection: t.Optional[CollectionDetail],
test_path: str,
results_path: str,
sanity_path: str,
sanity_messages: t.Optional[LayoutMessages],
integration_path: str,
integration_targets_path: str,
integration_vars_path: str,
integration_messages: t.Optional[LayoutMessages],
unit_path: str,
unit_module_path: str,
unit_module_utils_path: str,
unit_messages: t.Optional[LayoutMessages],
unsupported: bool = False,
) -> None:
super().__init__(root, paths)
self.plugin_paths = plugin_paths
@ -151,18 +151,18 @@ class ContentLayout(Layout):
class LayoutMessages:
"""Messages generated during layout creation that should be deferred for later display."""
def __init__(self):
self.info = [] # type: t.List[str]
self.warning = [] # type: t.List[str]
self.error = [] # type: t.List[str]
self.info: t.List[str] = []
self.warning: t.List[str] = []
self.error: t.List[str] = []
class CollectionDetail:
"""Details about the layout of the current collection."""
def __init__(self,
name, # type: str
namespace, # type: str
root, # type: str
): # type: (...) -> None
name: str,
namespace: str,
root: str,
) -> None:
self.name = name
self.namespace = namespace
self.root = root
@ -206,7 +206,7 @@ class LayoutProvider(PathProvider):
def paths_to_tree(paths: t.List[str]) -> t.Tuple[t.Dict[str, t.Any], t.List[str]]:
"""Return a filesystem tree from the given list of paths."""
tree = {}, [] # type: t.Tuple[t.Dict[str, t.Any], t.List[str]]
tree: t.Tuple[t.Dict[str, t.Any], t.List[str]] = {}, []
for path in paths:
parts = path.split(os.path.sep)

@ -70,7 +70,7 @@ class HostState:
def deserialize(args: EnvironmentConfig, path: str) -> HostState:
"""Deserialize host state from the given args and path."""
with open_binary_file(path) as state_file:
host_state = pickle.load(state_file) # type: HostState
host_state: HostState = pickle.load(state_file)
host_state.controller_profile.args = args
@ -95,11 +95,11 @@ class HostState:
def prepare_profiles(
args, # type: TEnvironmentConfig
targets_use_pypi=False, # type: bool
skip_setup=False, # type: bool
requirements=None, # type: t.Optional[t.Callable[[TEnvironmentConfig, HostState], None]]
): # type: (...) -> HostState
args: TEnvironmentConfig,
targets_use_pypi: bool = False,
skip_setup: bool = False,
requirements: t.Optional[t.Callable[[TEnvironmentConfig, HostState], None]] = None,
) -> HostState:
"""
Create new profiles, or load existing ones, and return them.
If a requirements callback was provided, it will be used before configuring hosts if delegation has already been performed.

@ -122,15 +122,15 @@ class PipBootstrap(PipCommand):
def install_requirements(
args, # type: EnvironmentConfig
python, # type: PythonConfig
ansible=False, # type: bool
command=False, # type: bool
coverage=False, # type: bool
virtualenv=False, # type: bool
controller=True, # type: bool
connection=None, # type: t.Optional[Connection]
): # type: (...) -> None
args: EnvironmentConfig,
python: PythonConfig,
ansible: bool = False,
command: bool = False,
coverage: bool = False,
virtualenv: bool = False,
controller: bool = True,
connection: t.Optional[Connection] = None,
) -> None:
"""Install requirements for the given Python using the specified arguments."""
create_result_directories(args)
@ -197,18 +197,18 @@ def collect_bootstrap(python: PythonConfig) -> t.List[PipCommand]:
def collect_requirements(
python, # type: PythonConfig
controller, # type: bool
ansible, # type: bool
cryptography, # type: bool
coverage, # type: bool
virtualenv, # type: bool
minimize, # type: bool
command, # type: t.Optional[str]
sanity, # type: t.Optional[str]
): # type: (...) -> t.List[PipCommand]
python: PythonConfig,
controller: bool,
ansible: bool,
cryptography: bool,
coverage: bool,
virtualenv: bool,
minimize: bool,
command: t.Optional[str],
sanity: t.Optional[str],
) -> t.List[PipCommand]:
"""Collect requirements for the given Python using the specified arguments."""
commands = [] # type: t.List[PipCommand]
commands: t.List[PipCommand] = []
if virtualenv:
# sanity tests on Python 2.x install virtualenv when it is too old or is not already installed and the `--requirements` option is given
@ -252,11 +252,11 @@ def collect_requirements(
def run_pip(
args, # type: EnvironmentConfig
python, # type: PythonConfig
commands, # type: t.List[PipCommand]
connection, # type: t.Optional[Connection]
): # type: (...) -> None
args: EnvironmentConfig,
python: PythonConfig,
commands: t.List[PipCommand],
connection: t.Optional[Connection],
) -> None:
"""Run the specified pip commands for the given Python, and optionally the specified host."""
connection = connection or LocalConnection(args)
script = prepare_pip_script(commands)
@ -280,12 +280,12 @@ def run_pip(
def collect_general_install(
command=None, # type: t.Optional[str]
ansible=False, # type: bool
): # type: (...) -> t.List[PipInstall]
command: t.Optional[str] = None,
ansible: bool = False,
) -> t.List[PipInstall]:
"""Return details necessary for the specified general-purpose pip install(s)."""
requirements_paths = [] # type: t.List[t.Tuple[str, str]]
constraints_paths = [] # type: t.List[t.Tuple[str, str]]
requirements_paths: t.List[t.Tuple[str, str]] = []
constraints_paths: t.List[t.Tuple[str, str]] = []
if ansible:
path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', 'ansible.txt')
@ -305,8 +305,8 @@ def collect_package_install(packages: t.List[str], constraints: bool = True) ->
def collect_sanity_install(sanity: str) -> t.List[PipInstall]:
"""Return the details necessary for the specified sanity pip install(s)."""
requirements_paths = [] # type: t.List[t.Tuple[str, str]]
constraints_paths = [] # type: t.List[t.Tuple[str, str]]
requirements_paths: t.List[t.Tuple[str, str]] = []
constraints_paths: t.List[t.Tuple[str, str]] = []
path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'requirements', f'sanity.{sanity}.txt')
requirements_paths.append((ANSIBLE_TEST_DATA_ROOT, path))
@ -320,8 +320,8 @@ def collect_sanity_install(sanity: str) -> t.List[PipInstall]:
def collect_units_install() -> t.List[PipInstall]:
"""Return details necessary for the specified units pip install(s)."""
requirements_paths = [] # type: t.List[t.Tuple[str, str]]
constraints_paths = [] # type: t.List[t.Tuple[str, str]]
requirements_paths: t.List[t.Tuple[str, str]] = []
constraints_paths: t.List[t.Tuple[str, str]] = []
path = os.path.join(data_context().content.unit_path, 'requirements.txt')
requirements_paths.append((data_context().content.root, path))
@ -334,8 +334,8 @@ def collect_units_install() -> t.List[PipInstall]:
def collect_integration_install(command: str, controller: bool) -> t.List[PipInstall]:
"""Return details necessary for the specified integration pip install(s)."""
requirements_paths = [] # type: t.List[t.Tuple[str, str]]
constraints_paths = [] # type: t.List[t.Tuple[str, str]]
requirements_paths: t.List[t.Tuple[str, str]] = []
constraints_paths: t.List[t.Tuple[str, str]] = []
# Support for prefixed files was added to ansible-test in ansible-core 2.12 when split controller/target testing was implemented.
# Previous versions of ansible-test only recognize non-prefixed files.
@ -367,10 +367,10 @@ def collect_integration_install(command: str, controller: bool) -> t.List[PipIns
def collect_install(
requirements_paths, # type: t.List[t.Tuple[str, str]]
constraints_paths, # type: t.List[t.Tuple[str, str]]
packages=None, # type: t.Optional[t.List[str]]
constraints=True, # type: bool
requirements_paths: t.List[t.Tuple[str, str]],
constraints_paths: t.List[t.Tuple[str, str]],
packages: t.Optional[t.List[str]] = None,
constraints: bool = True,
) -> t.List[PipInstall]:
"""Build a pip install list from the given requirements, constraints and packages."""
# listing content constraints first gives them priority over constraints provided by ansible-test

@ -47,9 +47,9 @@ class SshProcess:
"""Wrapper around an SSH process."""
def __init__(self, process: t.Optional[subprocess.Popen]) -> None:
self._process = process
self.pending_forwards = None # type: t.Optional[t.List[t.Tuple[str, int]]]
self.pending_forwards: t.Optional[t.List[t.Tuple[str, int]]] = None
self.forwards = {} # type: t.Dict[t.Tuple[str, int], int]
self.forwards: t.Dict[t.Tuple[str, int], int] = {}
def terminate(self) -> None:
"""Terminate the SSH process."""
@ -71,7 +71,7 @@ class SshProcess:
def collect_port_forwards(self): # type: (SshProcess) -> t.Dict[t.Tuple[str, int], int]
"""Collect port assignments for dynamic SSH port forwards."""
errors = [] # type: t.List[str]
errors: t.List[str] = []
display.info('Collecting %d SSH port forward(s).' % len(self.pending_forwards), verbosity=2)
@ -120,11 +120,11 @@ class SshProcess:
def create_ssh_command(
ssh, # type: SshConnectionDetail
options=None, # type: t.Optional[t.Dict[str, t.Union[str, int]]]
cli_args=None, # type: t.List[str]
command=None, # type: t.Optional[str]
): # type: (...) -> t.List[str]
ssh: SshConnectionDetail,
options: t.Optional[t.Dict[str, t.Union[str, int]]] = None,
cli_args: t.List[str] = None,
command: t.Optional[str] = None,
) -> t.List[str]:
"""Create an SSH command using the specified options."""
cmd = [
'ssh',
@ -166,12 +166,12 @@ def create_ssh_command(
def run_ssh_command(
args, # type: EnvironmentConfig
ssh, # type: SshConnectionDetail
options=None, # type: t.Optional[t.Dict[str, t.Union[str, int]]]
cli_args=None, # type: t.List[str]
command=None, # type: t.Optional[str]
): # type: (...) -> SshProcess
args: EnvironmentConfig,
ssh: SshConnectionDetail,
options: t.Optional[t.Dict[str, t.Union[str, int]]] = None,
cli_args: t.List[str] = None,
command: t.Optional[str] = None,
) -> SshProcess:
"""Run the specified SSH command, returning the created SshProcess instance created."""
cmd = create_ssh_command(ssh, options, cli_args, command)
env = common_environment()
@ -192,10 +192,10 @@ def run_ssh_command(
def create_ssh_port_forwards(
args, # type: EnvironmentConfig
ssh, # type: SshConnectionDetail
forwards, # type: t.List[t.Tuple[str, int]]
): # type: (...) -> SshProcess
args: EnvironmentConfig,
ssh: SshConnectionDetail,
forwards: t.List[t.Tuple[str, int]],
) -> SshProcess:
"""
Create SSH port forwards using the provided list of tuples (target_host, target_port).
Port bindings will be automatically assigned by SSH and must be collected with a subsequent call to collect_port_forwards.
@ -216,12 +216,12 @@ def create_ssh_port_forwards(
def create_ssh_port_redirects(
args, # type: EnvironmentConfig
ssh, # type: SshConnectionDetail
redirects, # type: t.List[t.Tuple[int, str, int]]
): # type: (...) -> SshProcess
args: EnvironmentConfig,
ssh: SshConnectionDetail,
redirects: t.List[t.Tuple[int, str, int]],
) -> SshProcess:
"""Create SSH port redirections using the provided list of tuples (bind_port, target_host, target_port)."""
options = {} # type: t.Dict[str, t.Union[str, int]]
options: t.Dict[str, t.Union[str, int]] = {}
cli_args = []
for bind_port, target_host, target_port in redirects:

@ -64,11 +64,11 @@ def walk_completion_targets(targets: t.Iterable[CompletionTarget], prefix: str,
def walk_internal_targets(
targets, # type: t.Iterable[TCompletionTarget]
includes=None, # type: t.Optional[t.List[str]]
excludes=None, # type: t.Optional[t.List[str]]
requires=None, # type: t.Optional[t.List[str]]
): # type: (...) -> t.Tuple[TCompletionTarget, ...]
targets: t.Iterable[TCompletionTarget],
includes: t.Optional[t.List[str]] = None,
excludes: t.Optional[t.List[str]] = None,
requires: t.Optional[t.List[str]] = None,
) -> t.Tuple[TCompletionTarget, ...]:
"""Return a tuple of matching completion targets."""
targets = tuple(targets)
@ -86,11 +86,11 @@ def walk_internal_targets(
def filter_targets(targets, # type: t.Iterable[TCompletionTarget]
patterns, # type: t.List[str]
include=True, # type: bool
directories=True, # type: bool
errors=True, # type: bool
): # type: (...) -> t.Iterable[TCompletionTarget]
patterns: t.List[str],
include: bool = True,
directories: bool = True,
errors: bool = True,
) -> t.Iterable[TCompletionTarget]:
"""Iterate over the given targets and filter them based on the supplied arguments."""
unmatched = set(patterns or ())
compiled_patterns = dict((p, re.compile('^%s$' % p)) for p in patterns) if patterns else None
@ -263,14 +263,14 @@ def load_integration_prefixes():
def walk_test_targets(
path=None, # type: t.Optional[str]
module_path=None, # type: t.Optional[str]
extensions=None, # type: t.Optional[t.Tuple[str, ...]]
prefix=None, # type: t.Optional[str]
extra_dirs=None, # type: t.Optional[t.Tuple[str, ...]]
include_symlinks=False, # type: bool
include_symlinked_directories=False, # type: bool
): # type: (...) -> t.Iterable[TestTarget]
path: t.Optional[str] = None,
module_path: t.Optional[str] = None,
extensions: t.Optional[t.Tuple[str, ...]] = None,
prefix: t.Optional[str] = None,
extra_dirs: t.Optional[t.Tuple[str, ...]] = None,
include_symlinks: bool = False,
include_symlinked_directories: bool = False,
) -> t.Iterable[TestTarget]:
"""Iterate over available test targets."""
if path:
file_paths = data_context().content.walk_files(path, include_symlinked_directories=include_symlinked_directories)
@ -454,11 +454,11 @@ class TestTarget(CompletionTarget):
"""Generic test target."""
def __init__(
self,
path, # type: str
module_path, # type: t.Optional[str]
module_prefix, # type: t.Optional[str]
base_path, # type: str
symlink=None, # type: t.Optional[bool]
path: str,
module_path: t.Optional[str],
module_prefix: t.Optional[str],
base_path: str,
symlink: t.Optional[bool] = None,
):
super().__init__()
@ -502,7 +502,7 @@ class IntegrationTargetType(enum.Enum):
def extract_plugin_references(name: str, aliases: t.List[str]) -> t.List[t.Tuple[str, str]]:
"""Return a list of plugin references found in the given integration test target name and aliases."""
plugins = content_plugins()
found = [] # type: t.List[t.Tuple[str, str]]
found: t.List[t.Tuple[str, str]] = []
for alias in [name] + aliases:
plugin_type = 'modules'

@ -192,7 +192,7 @@ class TestSkipped(TestResult):
def __init__(self, command: str, test: str, python_version: t.Optional[str] = None) -> None:
super().__init__(command, test, python_version)
self.reason = None # type: t.Optional[str]
self.reason: t.Optional[str] = None
def write_console(self) -> None:
"""Write results to console."""
@ -216,11 +216,11 @@ class TestFailure(TestResult):
"""Test failure."""
def __init__(
self,
command, # type: str
test, # type: str
python_version=None, # type: t.Optional[str]
messages=None, # type: t.Optional[t.Sequence[TestMessage]]
summary=None, # type: t.Optional[str]
command: str,
test: str,
python_version: t.Optional[str] = None,
messages: t.Optional[t.Sequence[TestMessage]] = None,
summary: t.Optional[str] = None,
):
super().__init__(command, test, python_version)
@ -393,13 +393,13 @@ class TestMessage:
"""Single test message for one file."""
def __init__(
self,
message, # type: str
path, # type: str
line=0, # type: int
column=0, # type: int
level='error', # type: str
code=None, # type: t.Optional[str]
confidence=None, # type: t.Optional[int]
message: str,
path: str,
line: int = 0,
column: int = 0,
level: str = 'error',
code: t.Optional[str] = None,
confidence: t.Optional[int] = None,
):
self.__path = path
self.__line = line

@ -15,7 +15,7 @@ class WrappedThread(threading.Thread):
"""Wrapper around Thread which captures results and exceptions."""
def __init__(self, action): # type: (t.Callable[[], t.Any]) -> None
super().__init__()
self._result = queue.Queue() # type: queue.Queue[t.Any]
self._result: queue.Queue[t.Any] = queue.Queue()
self.action = action
self.result = None

@ -62,7 +62,7 @@ TBase = t.TypeVar('TBase')
TKey = t.TypeVar('TKey')
TValue = t.TypeVar('TValue')
PYTHON_PATHS = {} # type: t.Dict[str, str]
PYTHON_PATHS: t.Dict[str, str] = {}
COVERAGE_CONFIG_NAME = 'coveragerc'
@ -143,7 +143,7 @@ def is_valid_identifier(value: str) -> bool:
def cache(func): # type: (t.Callable[[], TValue]) -> t.Callable[[], TValue]
"""Enforce exclusive access on a decorated function and cache the result."""
storage = {} # type: t.Dict[None, TValue]
storage: t.Dict[None, TValue] = {}
sentinel = object()
@functools.wraps(func)
@ -345,20 +345,20 @@ def get_available_python_versions() -> t.Dict[str, str]:
def raw_command(
cmd, # type: t.Iterable[str]
capture, # type: bool
env=None, # type: t.Optional[t.Dict[str, str]]
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
explain=False, # type: bool
stdin=None, # type: t.Optional[t.Union[t.IO[bytes], int]]
stdout=None, # type: t.Optional[t.Union[t.IO[bytes], int]]
interactive=False, # type: bool
output_stream=None, # type: t.Optional[OutputStream]
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
cmd: t.Iterable[str],
capture: bool,
env: t.Optional[t.Dict[str, str]] = None,
data: t.Optional[str] = None,
cwd: t.Optional[str] = None,
explain: bool = False,
stdin: t.Optional[t.Union[t.IO[bytes], int]] = None,
stdout: t.Optional[t.Union[t.IO[bytes], int]] = None,
interactive: bool = False,
output_stream: t.Optional[OutputStream] = None,
cmd_verbosity: int = 1,
str_errors: str = 'strict',
error_callback: t.Optional[t.Callable[[SubprocessError], None]] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return stdout and stderr as a tuple."""
output_stream = output_stream or OutputStream.AUTO
@ -577,7 +577,7 @@ class ReaderThread(WrappedThread, metaclass=abc.ABCMeta):
self.handle = handle
self.buffer = buffer
self.lines = [] # type: t.List[bytes]
self.lines: t.List[bytes] = []
@abc.abstractmethod
def _run(self) -> None:
@ -849,11 +849,11 @@ class Display:
def print_message( # pylint: disable=locally-disabled, invalid-name
self,
message, # type: str
color=None, # type: t.Optional[str]
stderr=False, # type: bool
truncate=False, # type: bool
): # type: (...) -> None
message: str,
color: t.Optional[str] = None,
stderr: bool = False,
truncate: bool = False,
) -> None:
"""Display a message."""
if self.redact and self.sensitive:
for item in self.sensitive:
@ -895,13 +895,13 @@ class SubprocessError(ApplicationError):
"""Error resulting from failed subprocess execution."""
def __init__(
self,
cmd, # type: t.List[str]
status=0, # type: int
stdout=None, # type: t.Optional[str]
stderr=None, # type: t.Optional[str]
runtime=None, # type: t.Optional[float]
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> None
cmd: t.List[str],
status: int = 0,
stdout: t.Optional[str] = None,
stderr: t.Optional[str] = None,
runtime: t.Optional[float] = None,
error_callback: t.Optional[t.Callable[[SubprocessError], None]] = None,
) -> None:
message = 'Command "%s" returned exit status %s.\n' % (shlex.join(cmd), status)
if stderr:
@ -970,8 +970,8 @@ def parse_to_list_of_dict(pattern: str, value: str) -> t.List[t.Dict[str, str]]:
def get_subclasses(class_type: t.Type[C]) -> t.List[t.Type[C]]:
"""Returns a list of types that are concrete subclasses of the given type."""
subclasses = set() # type: t.Set[t.Type[C]]
queue = [class_type] # type: t.List[t.Type[C]]
subclasses: t.Set[t.Type[C]] = set()
queue: t.List[t.Type[C]] = [class_type]
while queue:
parent = queue.pop()
@ -1049,7 +1049,7 @@ def load_plugins(base_type, database): # type: (t.Type[C], t.Dict[str, t.Type[C
Load plugins of the specified type and track them in the specified database.
Only plugins which have already been imported will be loaded.
"""
plugins = dict((sc.__module__.rsplit('.', 1)[1], sc) for sc in get_subclasses(base_type)) # type: t.Dict[str, t.Type[C]]
plugins: t.Dict[str, t.Type[C]] = dict((sc.__module__.rsplit('.', 1)[1], sc) for sc in get_subclasses(base_type))
for plugin in plugins:
database[plugin] = plugins[plugin]

@ -58,7 +58,7 @@ from .host_configs import (
VirtualPythonConfig,
)
CHECK_YAML_VERSIONS = {} # type: t.Dict[str, t.Any]
CHECK_YAML_VERSIONS: t.Dict[str, t.Any] = {}
class ShellScriptTemplate:
@ -84,13 +84,13 @@ class ShellScriptTemplate:
class ResultType:
"""Test result type."""
BOT = None # type: ResultType
COVERAGE = None # type: ResultType
DATA = None # type: ResultType
JUNIT = None # type: ResultType
LOGS = None # type: ResultType
REPORTS = None # type: ResultType
TMP = None # type: ResultType
BOT: ResultType = None
COVERAGE: ResultType = None
DATA: ResultType = None
JUNIT: ResultType = None
LOGS: ResultType = None
REPORTS: ResultType = None
TMP: ResultType = None
@staticmethod
def _populate():
@ -129,20 +129,20 @@ class CommonConfig:
self.command = command
self.interactive = False
self.check_layout = True
self.success = None # type: t.Optional[bool]
self.success: t.Optional[bool] = None
self.color = args.color # type: bool
self.explain = args.explain # type: bool
self.verbosity = args.verbosity # type: int
self.debug = args.debug # type: bool
self.truncate = args.truncate # type: int
self.redact = args.redact # type: bool
self.color: bool = args.color
self.explain: bool = args.explain
self.verbosity: int = args.verbosity
self.debug: bool = args.debug
self.truncate: int = args.truncate
self.redact: bool = args.redact
self.display_stderr = False # type: bool
self.display_stderr: bool = False
self.session_name = generate_name()
self.cache = {} # type: t.Dict[str, t.Any]
self.cache: t.Dict[str, t.Any] = {}
def get_ansible_config(self) -> str:
"""Return the path to the Ansible config for the given config."""
@ -210,11 +210,11 @@ def named_temporary_file(args: CommonConfig, prefix: str, suffix: str, directory
def write_json_test_results(category, # type: ResultType
name, # type: str
content, # type: t.Union[t.List[t.Any], t.Dict[str, t.Any]]
formatted=True, # type: bool
encoder=None, # type: t.Optional[t.Type[json.JSONEncoder]]
): # type: (...) -> None
name: str,
content: t.Union[t.List[t.Any], t.Dict[str, t.Any]],
formatted: bool = True,
encoder: t.Optional[t.Type[json.JSONEncoder]] = None,
) -> None:
"""Write the given json content to the specified test results path, creating directories as needed."""
path = os.path.join(category.path, name)
write_json_file(path, content, create_directories=True, formatted=formatted, encoder=encoder)
@ -368,15 +368,15 @@ def cleanup_python_paths():
def intercept_python(
args, # type: CommonConfig
python, # type: PythonConfig
cmd, # type: t.List[str]
env, # type: t.Dict[str, str]
capture, # type: bool
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
always=False, # type: bool
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
args: CommonConfig,
python: PythonConfig,
cmd: t.List[str],
env: t.Dict[str, str],
capture: bool,
data: t.Optional[str] = None,
cwd: t.Optional[str] = None,
always: bool = False,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""
Run a command while intercepting invocations of Python to control the version used.
If the specified Python is an ansible-test managed virtual environment, it will be added to PATH to activate it.
@ -400,21 +400,21 @@ def intercept_python(
def run_command(
args, # type: CommonConfig
cmd, # type: t.Iterable[str]
capture, # type: bool
env=None, # type: t.Optional[t.Dict[str, str]]
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
always=False, # type: bool
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
interactive=False, # type: bool
output_stream=None, # type: t.Optional[OutputStream]
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
args: CommonConfig,
cmd: t.Iterable[str],
capture: bool,
env: t.Optional[t.Dict[str, str]] = None,
data: t.Optional[str] = None,
cwd: t.Optional[str] = None,
always: bool = False,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
interactive: bool = False,
output_stream: t.Optional[OutputStream] = None,
cmd_verbosity: int = 1,
str_errors: str = 'strict',
error_callback: t.Optional[t.Callable[[SubprocessError], None]] = None,
) -> t.Tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return stdout and stderr as a tuple."""
explain = args.explain and not always
return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout, interactive=interactive,

@ -40,9 +40,9 @@ from .python_requirements import (
def get_virtual_python(
args, # type: EnvironmentConfig
python, # type: VirtualPythonConfig
): # type: (...) -> VirtualPythonConfig
args: EnvironmentConfig,
python: VirtualPythonConfig,
) -> VirtualPythonConfig:
"""Create a virtual environment for the given Python and return the path to its root."""
if python.system_site_packages:
suffix = '-ssp'
@ -78,11 +78,11 @@ def get_virtual_python(
def create_virtual_environment(args, # type: EnvironmentConfig
python, # type: PythonConfig
path, # type: str
system_site_packages=False, # type: bool
pip=False, # type: bool
): # type: (...) -> bool
python: PythonConfig,
path: str,
system_site_packages: bool = False,
pip: bool = False,
) -> bool:
"""Create a virtual environment using venv or virtualenv for the requested Python version."""
if not os.path.exists(python.path):
# the requested python version could not be found
@ -180,11 +180,11 @@ def get_python_real_prefix(python_path: str) -> t.Optional[str]:
def run_venv(args, # type: EnvironmentConfig
run_python, # type: str
system_site_packages, # type: bool
pip, # type: bool
path, # type: str
): # type: (...) -> bool
run_python: str,
system_site_packages: bool,
pip: bool,
path: str,
) -> bool:
"""Create a virtual environment using the 'venv' module. Not available on Python 2.x."""
cmd = [run_python, '-m', 'venv']
@ -210,12 +210,12 @@ def run_venv(args, # type: EnvironmentConfig
def run_virtualenv(args, # type: EnvironmentConfig
run_python, # type: str
env_python, # type: str
system_site_packages, # type: bool
pip, # type: bool
path, # type: str
): # type: (...) -> bool
run_python: str,
env_python: str,
system_site_packages: bool,
pip: bool,
path: str,
) -> bool:
"""Create a virtual environment using the 'virtualenv' module."""
# always specify which interpreter to use to guarantee the desired interpreter is provided
# otherwise virtualenv may select a different interpreter than the one running virtualenv

Loading…
Cancel
Save