ansible-test - Code cleanup and refactoring. (#77169)

* Remove unnecessary PyCharm ignores.
* Ignore intentional undefined attribute usage.
* Add missing type hints. Fix existing type hints.
* Fix docstrings and comments.
* Use function to register completion handler.
* Pass strings to display functions.
* Fix CompositeAction handling of dest argument.
* Use consistent types in expressions/assignments.
* Use custom function to keep linters happy.
* Add missing raise for custom exception.
* Clean up key/value type handling in cloud plugins.
* Use dataclass instead of dict for results.
* Add custom type_guard function to check lists.
* Ignore return type that can't be checked (yet).
* Avoid changing types on local variables.
pull/62824/head
Matt Clay 3 years ago committed by GitHub
parent 8291dbdf81
commit a06fa496d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -78,7 +78,6 @@ def main(cli_args=None): # type: (t.Optional[t.List[str]]) -> None
delegate_args = (ex.host_state, ex.exclude, ex.require)
if delegate_args:
# noinspection PyTypeChecker
delegate(config, *delegate_args)
if target_names:

@ -198,7 +198,7 @@ def get_ansible_python_path(args): # type: (CommonConfig) -> str
If a temporary directory is required, it will be cached for the lifetime of the process and cleaned up at exit.
"""
try:
return get_ansible_python_path.python_path
return get_ansible_python_path.python_path # type: ignore[attr-defined]
except AttributeError:
pass
@ -216,7 +216,7 @@ def get_ansible_python_path(args): # type: (CommonConfig) -> str
if not args.explain:
generate_egg_info(python_path)
get_ansible_python_path.python_path = python_path
get_ansible_python_path.python_path = python_path # type: ignore[attr-defined]
return python_path

@ -35,8 +35,8 @@ class Bootstrap:
"""The bootstrap type to pass to the bootstrapping script."""
return self.__class__.__name__.replace('Bootstrap', '').lower()
def get_variables(self): # type: () -> t.Dict[str, str]
"""The variables to template in the boostrapping script."""
def get_variables(self): # type: () -> t.Dict[str, t.Union[str, t.List[str]]]
"""The variables to template in the bootstrapping script."""
return dict(
bootstrap_type=self.bootstrap_type,
controller='yes' if self.controller else '',
@ -65,8 +65,8 @@ class Bootstrap:
@dataclasses.dataclass
class BootstrapDocker(Bootstrap):
"""Bootstrap docker instances."""
def get_variables(self): # type: () -> t.Dict[str, str]
"""The variables to template in the boostrapping script."""
def get_variables(self): # type: () -> t.Dict[str, t.Union[str, t.List[str]]]
"""The variables to template in the bootstrapping script."""
variables = super().get_variables()
variables.update(
@ -83,8 +83,8 @@ class BootstrapRemote(Bootstrap):
platform: str
platform_version: str
def get_variables(self): # type: () -> t.Dict[str, str]
"""The variables to template in the boostrapping script."""
def get_variables(self): # type: () -> t.Dict[str, t.Union[str, t.List[str]]]
"""The variables to template in the bootstrapping script."""
variables = super().get_variables()
variables.update(

@ -114,7 +114,7 @@ class AuthHelper(metaclass=abc.ABCMeta):
def initialize_private_key(self): # type: () -> str
"""
Initialize and publish a new key pair (if needed) and return the private key.
The private key is cached across ansible-test invocations so it is only generated and published once per CI job.
The private key is cached across ansible-test invocations, so it is only generated and published once per CI job.
"""
path = os.path.expanduser('~/.ansible-core-ci-private.key')
@ -166,14 +166,12 @@ class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
public_key = private_key.public_key()
# noinspection PyUnresolvedReferences
private_key_pem = to_text(private_key.private_bytes(
private_key_pem = to_text(private_key.private_bytes( # type: ignore[attr-defined] # documented method, but missing from type stubs
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
))
# noinspection PyTypeChecker
public_key_pem = to_text(public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,

@ -15,6 +15,7 @@ from ..target import (
walk_sanity_targets,
load_integration_prefixes,
analyze_integration_target_dependencies,
IntegrationTarget,
)
from ..util import (
@ -63,14 +64,14 @@ def categorize_changes(args, paths, verbose_command=None): # type: (TestConfig,
'integration': set(),
'windows-integration': set(),
'network-integration': set(),
}
} # type: t.Dict[str, t.Set[str]]
focused_commands = collections.defaultdict(set)
deleted_paths = set()
original_paths = set()
additional_paths = set()
no_integration_paths = set()
deleted_paths = set() # type: t.Set[str]
original_paths = set() # type: t.Set[str]
additional_paths = set() # type: t.Set[str]
no_integration_paths = set() # type: t.Set[str]
for path in paths:
if not os.path.exists(path):
@ -110,7 +111,7 @@ def categorize_changes(args, paths, verbose_command=None): # type: (TestConfig,
tests = all_tests(args) # not categorized, run all tests
display.warning('Path not categorized: %s' % path)
else:
focused_target = tests.pop(FOCUSED_TARGET, False) and path in original_paths
focused_target = bool(tests.pop(FOCUSED_TARGET, None)) and path in original_paths
tests = dict((key, value) for key, value in tests.items() if value)
@ -155,18 +156,18 @@ def categorize_changes(args, paths, verbose_command=None): # type: (TestConfig,
if any(target == 'all' for target in targets):
commands[command] = {'all'}
commands = dict((c, sorted(targets)) for c, targets in commands.items() if targets)
sorted_commands = dict((c, sorted(targets)) for c, targets in commands.items() if targets)
focused_commands = dict((c, sorted(targets)) for c, targets in focused_commands.items())
for command, targets in commands.items():
for command, targets in sorted_commands.items():
if targets == ['all']:
commands[command] = [] # changes require testing all targets, do not filter targets
sorted_commands[command] = [] # changes require testing all targets, do not filter targets
changes = ChangeDescription()
changes.command = verbose_command
changes.changed_paths = sorted(original_paths)
changes.deleted_paths = sorted(deleted_paths)
changes.regular_command_targets = commands
changes.regular_command_targets = sorted_commands
changes.focused_command_targets = focused_commands
changes.no_integration_paths = sorted(no_integration_paths)
@ -205,11 +206,11 @@ class PathMapper:
self.prefixes = load_integration_prefixes()
self.integration_dependencies = analyze_integration_target_dependencies(self.integration_targets)
self.python_module_utils_imports = {} # populated on first use to reduce overhead when not needed
self.powershell_module_utils_imports = {} # populated on first use to reduce overhead when not needed
self.csharp_module_utils_imports = {} # populated on first use to reduce overhead when not needed
self.python_module_utils_imports = {} # type: t.Dict[str, t.Set[str]] # populated on first use to reduce overhead when not needed
self.powershell_module_utils_imports = {} # type: t.Dict[str, t.Set[str]] # populated on first use to reduce overhead when not needed
self.csharp_module_utils_imports = {} # type: t.Dict[str, t.Set[str]] # populated on first use to reduce overhead when not needed
self.paths_to_dependent_targets = {}
self.paths_to_dependent_targets = {} # type: t.Dict[str, t.Set[IntegrationTarget]]
for target in self.integration_targets:
for path in target.needs_file:
@ -341,7 +342,7 @@ class PathMapper:
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
minimal = {}
minimal = {} # type: t.Dict[str, str]
if os.path.sep not in path:
if filename in (
@ -372,7 +373,7 @@ class PathMapper:
'integration': target.name if 'posix/' in target.aliases else None,
'windows-integration': target.name if 'windows/' in target.aliases else None,
'network-integration': target.name if 'network/' in target.aliases else None,
FOCUSED_TARGET: True,
FOCUSED_TARGET: target.name,
}
if is_subdir(path, data_context().content.integration_path):
@ -430,7 +431,7 @@ class PathMapper:
'integration': self.posix_integration_by_module.get(module_name) if ext == '.py' else None,
'windows-integration': self.windows_integration_by_module.get(module_name) if ext in ['.cs', '.ps1'] else None,
'network-integration': self.network_integration_by_module.get(module_name),
FOCUSED_TARGET: True,
FOCUSED_TARGET: module_name,
}
return minimal
@ -582,7 +583,7 @@ class PathMapper:
'windows-integration': target.name if target and 'windows/' in target.aliases else None,
'network-integration': target.name if target and 'network/' in target.aliases else None,
'units': units_path,
FOCUSED_TARGET: target is not None,
FOCUSED_TARGET: target.name if target else None,
}
if is_subdir(path, data_context().content.plugin_paths['filter']):
@ -630,7 +631,7 @@ class PathMapper:
filename = os.path.basename(path)
dummy, ext = os.path.splitext(filename)
minimal = {}
minimal = {} # type: t.Dict[str, str]
if path.startswith('changelogs/'):
return minimal
@ -674,7 +675,7 @@ class PathMapper:
filename = os.path.basename(path)
name, ext = os.path.splitext(filename)
minimal = {}
minimal = {} # type: t.Dict[str, str]
if path.startswith('bin/'):
return all_tests(self.args) # broad impact, run all tests
@ -721,7 +722,6 @@ class PathMapper:
if path.startswith('test/lib/ansible_test/config/'):
if name.startswith('cloud-config-'):
# noinspection PyTypeChecker
cloud_target = 'cloud/%s/' % name.split('-')[2].split('.')[0]
if cloud_target in self.integration_targets_by_alias:

@ -236,7 +236,7 @@ class ModuleUtilFinder(ast.NodeVisitor):
def __init__(self, path, module_utils): # type: (str, t.Set[str]) -> None
self.path = path
self.module_utils = module_utils
self.imports = set()
self.imports = set() # type: t.Set[str]
# implicitly import parent package
@ -276,7 +276,6 @@ class ModuleUtilFinder(ast.NodeVisitor):
# While that will usually be true, there are exceptions which will result in this resolution being incorrect.
self.module = path_to_module(os.path.join(data_context().content.collection.directory, self.path))
# noinspection PyPep8Naming
# pylint: disable=locally-disabled, invalid-name
def visit_Import(self, node): # type: (ast.Import) -> None
"""Visit an import node."""
@ -286,7 +285,6 @@ class ModuleUtilFinder(ast.NodeVisitor):
# import ansible_collections.{ns}.{col}.plugins.module_utils.module_utils.MODULE[.MODULE]
self.add_imports([alias.name for alias in node.names], node.lineno)
# noinspection PyPep8Naming
# pylint: disable=locally-disabled, invalid-name
def visit_ImportFrom(self, node): # type: (ast.ImportFrom) -> None
"""Visit an import from node."""

@ -37,7 +37,7 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.registered_completions = None # type: t.Optional[str]
self.registered_completions = None # type: t.Optional[t.List[str]]
def completer(
self,
@ -88,20 +88,18 @@ class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
"""Base class for actions that parse composite arguments."""
documentation_state = {} # type: t.Dict[t.Type[CompositeAction], DocumentationState]
# noinspection PyUnusedLocal
def __init__(
self,
*args,
dest, # type: str
**kwargs,
):
del dest
self.definition = self.create_parser()
self.documentation_state[type(self)] = documentation_state = DocumentationState()
self.definition.document(documentation_state)
super().__init__(*args, dest=self.definition.dest, **kwargs)
kwargs.update(dest=self.definition.dest)
super().__init__(*args, **kwargs)
register_safe_action(type(self))
@ -139,10 +137,12 @@ class CompositeActionCompletionFinder(RegisteredCompletionFinder):
def get_completions(
self,
prefix, # type: str
action, # type: CompositeAction
action, # type: argparse.Action
parsed_args, # type: argparse.Namespace
): # type: (...) -> t.List[str]
"""Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed."""
assert isinstance(action, CompositeAction)
state = ParserState(
mode=ParserMode.LIST if self.list_mode else ParserMode.COMPLETE,
remainder=prefix,
@ -238,6 +238,8 @@ def complete(
"""Perform argument completion using the given completer and return the completion result."""
value = state.remainder
answer: Completion
try:
completer.parse(state)
raise ParserError('completion expected')

@ -7,8 +7,8 @@ import typing as t
class EnumAction(argparse.Action):
"""Parse an enum using the lowercases enum names."""
def __init__(self, **kwargs): # type: (t.Dict[str, t.Any]) -> None
"""Parse an enum using the lowercase enum names."""
def __init__(self, **kwargs: t.Any) -> None:
self.enum_type = kwargs.pop('type', None) # type: t.Type[enum.Enum]
kwargs.setdefault('choices', tuple(e.name.lower() for e in self.enum_type))
super().__init__(**kwargs)

@ -173,7 +173,7 @@ class ParserState:
self.namespaces.append(namespace)
@contextlib.contextmanager
def delimit(self, delimiters, required=True): # type: (str, bool) -> t.ContextManager[ParserBoundary]
def delimit(self, delimiters, required=True): # type: (str, bool) -> t.Iterator[ParserBoundary]
"""Context manager for delimiting parsing of input."""
boundary = ParserBoundary(delimiters=delimiters, required=required)
@ -394,7 +394,7 @@ class FileParser(Parser):
else:
path = ''
with state.delimit(PATH_DELIMITER, required=False) as boundary:
with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
while boundary.ready:
directory = path or '.'
@ -420,7 +420,7 @@ class AbsolutePathParser(Parser):
"""Parse the input from the given state and return the result."""
path = ''
with state.delimit(PATH_DELIMITER, required=False) as boundary:
with state.delimit(PATH_DELIMITER, required=False) as boundary: # type: ParserBoundary
while boundary.ready:
if path:
path += AnyParser(nothing=True).parse(state)
@ -506,7 +506,7 @@ class KeyValueParser(Parser, metaclass=abc.ABCMeta):
parsers = self.get_parsers(state)
keys = list(parsers)
with state.delimit(PAIR_DELIMITER, required=False) as pair:
with state.delimit(PAIR_DELIMITER, required=False) as pair: # type: ParserBoundary
while pair.ready:
with state.delimit(ASSIGNMENT_DELIMITER):
key = ChoicesParser(keys).parse(state)
@ -528,7 +528,7 @@ class PairParser(Parser, metaclass=abc.ABCMeta):
state.set_namespace(namespace)
with state.delimit(self.delimiter, self.required) as boundary:
with state.delimit(self.delimiter, self.required) as boundary: # type: ParserBoundary
choice = self.get_left_parser(state).parse(state)
if boundary.match:

@ -11,6 +11,7 @@ from ...util import (
from ..completers import (
complete_target,
register_completer,
)
from ..environments import (
@ -110,33 +111,33 @@ def do_commands(
testing = test.add_argument_group(title='common testing arguments')
testing.add_argument(
register_completer(testing.add_argument(
'include',
metavar='TARGET',
nargs='*',
help='test the specified target',
).completer = functools.partial(complete_target, completer)
), functools.partial(complete_target, completer))
testing.add_argument(
register_completer(testing.add_argument(
'--include',
metavar='TARGET',
action='append',
help='include the specified target',
).completer = functools.partial(complete_target, completer)
), functools.partial(complete_target, completer))
testing.add_argument(
register_completer(testing.add_argument(
'--exclude',
metavar='TARGET',
action='append',
help='exclude the specified target',
).completer = functools.partial(complete_target, completer)
), functools.partial(complete_target, completer))
testing.add_argument(
register_completer(testing.add_argument(
'--require',
metavar='TARGET',
action='append',
help='require the specified target',
).completer = functools.partial(complete_target, completer)
), functools.partial(complete_target, completer))
testing.add_argument(
'--coverage',

@ -5,6 +5,7 @@ import argparse
from ...completers import (
complete_target,
register_completer,
)
from ...environments import (
@ -43,12 +44,12 @@ def do_integration(
def add_integration_common(
parser, # type: argparse.ArgumentParser
):
"""Add common integration argumetns."""
parser.add_argument(
"""Add common integration arguments."""
register_completer(parser.add_argument(
'--start-at',
metavar='TARGET',
help='start at the specified target',
).completer = complete_target
), complete_target)
parser.add_argument(
'--start-at-task',

@ -28,6 +28,10 @@ from ...environments import (
add_environments,
)
from ...completers import (
register_completer,
)
def do_network_integration(
subparsers,
@ -51,11 +55,11 @@ def do_network_integration(
add_integration_common(network_integration)
network_integration.add_argument(
register_completer(network_integration.add_argument(
'--testcase',
metavar='TESTCASE',
help='limit a test to a specified testcase',
).completer = complete_network_testcase
), complete_network_testcase)
add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.NETWORK_INTEGRATION) # network-integration

@ -55,7 +55,7 @@ from ..data import (
)
def filter_python(version, versions): # type: (t.Optional[str], t.Optional[t.List[str]]) -> t.Optional[str]
def filter_python(version, versions): # type: (t.Optional[str], t.Optional[t.Sequence[str]]) -> t.Optional[str]
"""If a Python version is given and is in the given version list, return that Python version, otherwise return None."""
return version if version in versions else None
@ -237,8 +237,8 @@ def convert_legacy_args(
args.targets = targets
if used_default_pythons:
targets = t.cast(t.List[ControllerConfig], targets)
skipped_python_versions = sorted_versions(list(set(SUPPORTED_PYTHON_VERSIONS) - {target.python.version for target in targets}))
control_targets = t.cast(t.List[ControllerConfig], targets)
skipped_python_versions = sorted_versions(list(set(SUPPORTED_PYTHON_VERSIONS) - {target.python.version for target in control_targets}))
else:
skipped_python_versions = []
@ -260,10 +260,12 @@ def controller_targets(
mode, # type: TargetMode
options, # type: LegacyHostOptions
controller, # type: ControllerHostConfig
): # type: (...) -> t.List[ControllerConfig]
): # type: (...) -> t.List[HostConfig]
"""Return the configuration for controller targets."""
python = native_python(options)
targets: t.List[HostConfig]
if python:
targets = [ControllerConfig(python=python)]
else:
@ -283,7 +285,7 @@ def native_python(options): # type: (LegacyHostOptions) -> t.Optional[NativePyt
def get_legacy_host_config(
mode, # type: TargetMode
options, # type: LegacyHostOptions
): # type: (...) -> t.Tuple[HostConfig, t.List[HostConfig], t.Optional[FallbackDetail]]
): # type: (...) -> t.Tuple[ControllerHostConfig, t.List[HostConfig], t.Optional[FallbackDetail]]
"""
Returns controller and target host configs derived from the provided legacy host options.
The goal is to match the original behavior, by using non-split testing whenever possible.
@ -296,6 +298,9 @@ def get_legacy_host_config(
controller_fallback = None # type: t.Optional[t.Tuple[str, str, FallbackReason]]
controller: t.Optional[ControllerHostConfig]
targets: t.List[HostConfig]
if options.venv:
if controller_python(options.python) or not options.python:
controller = OriginConfig(python=VirtualPythonConfig(version=options.python or 'default', system_site_packages=options.venv_system_site_packages))
@ -304,14 +309,21 @@ def get_legacy_host_config(
controller = OriginConfig(python=VirtualPythonConfig(version='default', system_site_packages=options.venv_system_site_packages))
if mode in (TargetMode.SANITY, TargetMode.UNITS):
targets = controller_targets(mode, options, controller)
python = native_python(options)
if python:
control_targets = [ControllerConfig(python=python)]
else:
control_targets = controller.get_default_targets(HostContext(controller_config=controller))
# Target sanity tests either have no Python requirements or manage their own virtual environments.
# Thus there is no point in setting up virtual environments ahead of time for them.
# Thus, there is no point in setting up virtual environments ahead of time for them.
if mode == TargetMode.UNITS:
targets = [ControllerConfig(python=VirtualPythonConfig(version=target.python.version, path=target.python.path,
system_site_packages=options.venv_system_site_packages)) for target in targets]
system_site_packages=options.venv_system_site_packages)) for target in control_targets]
else:
targets = t.cast(t.List[HostConfig], control_targets)
else:
targets = [ControllerConfig(python=VirtualPythonConfig(version=options.python or 'default',
system_site_packages=options.venv_system_site_packages))]
@ -448,17 +460,19 @@ def handle_non_posix_targets(
targets = [WindowsInventoryConfig(path=options.inventory)]
elif mode == TargetMode.NETWORK_INTEGRATION:
if options.platform:
targets = [NetworkRemoteConfig(name=platform, provider=options.remote_provider) for platform in options.platform]
network_targets = [NetworkRemoteConfig(name=platform, provider=options.remote_provider) for platform in options.platform]
for platform, collection in options.platform_collection or []:
for entry in targets:
for entry in network_targets:
if entry.platform == platform:
entry.collection = collection
for platform, connection in options.platform_connection or []:
for entry in targets:
for entry in network_targets:
if entry.platform == platform:
entry.connection = connection
targets = t.cast(t.List[HostConfig], network_targets)
else:
targets = [NetworkInventoryConfig(path=options.inventory)]
@ -470,12 +484,14 @@ def default_targets(
controller, # type: ControllerHostConfig
): # type: (...) -> t.List[HostConfig]
"""Return a list of default targets for the given target mode."""
targets: t.List[HostConfig]
if mode == TargetMode.WINDOWS_INTEGRATION:
targets = [WindowsInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.winrm')))]
elif mode == TargetMode.NETWORK_INTEGRATION:
targets = [NetworkInventoryConfig(path=os.path.abspath(os.path.join(data_context().content.integration_path, 'inventory.networking')))]
elif mode.multiple_pythons:
targets = controller.get_default_targets(HostContext(controller_config=controller))
targets = t.cast(t.List[HostConfig], controller.get_default_targets(HostContext(controller_config=controller)))
else:
targets = [ControllerConfig()]

@ -24,3 +24,8 @@ def complete_choices(choices: t.List[str], prefix: str, **_) -> t.List[str]:
"""Perform completion using the provided choices."""
matches = [choice for choice in choices if choice.startswith(prefix)]
return matches
def register_completer(action: argparse.Action, completer) -> None:
"""Register the given completer with the specified action."""
action.completer = completer # type: ignore[attr-defined] # intentionally using an attribute that does not exist

@ -53,6 +53,7 @@ from ..config import (
from .completers import (
complete_choices,
register_completer,
)
from .converters import (
@ -175,40 +176,40 @@ def add_composite_environment_options(
if controller_mode == ControllerMode.NO_DELEGATION:
composite_parser.set_defaults(controller=None)
else:
composite_parser.add_argument(
register_completer(composite_parser.add_argument(
'--controller',
metavar='OPT',
action=register_action_type(DelegatedControllerAction if controller_mode == ControllerMode.DELEGATED else OriginControllerAction),
help='configuration for the controller',
).completer = completer.completer
), completer.completer)
if target_mode == TargetMode.NO_TARGETS:
composite_parser.set_defaults(targets=[])
elif target_mode == TargetMode.SHELL:
group = composite_parser.add_mutually_exclusive_group()
group.add_argument(
register_completer(group.add_argument(
'--target-posix',
metavar='OPT',
action=register_action_type(PosixSshTargetAction),
help='configuration for the target',
).completer = completer.completer
), completer.completer)
suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS
group.add_argument(
register_completer(group.add_argument(
'--target-windows',
metavar='OPT',
action=WindowsSshTargetAction if suppress else register_action_type(WindowsSshTargetAction),
help=suppress or 'configuration for the target',
).completer = completer.completer
), completer.completer)
group.add_argument(
register_completer(group.add_argument(
'--target-network',
metavar='OPT',
action=NetworkSshTargetAction if suppress else register_action_type(NetworkSshTargetAction),
help=suppress or 'configuration for the target',
).completer = completer.completer
), completer.completer)
else:
if target_mode.multiple_pythons:
target_option = '--target-python'
@ -230,12 +231,12 @@ def add_composite_environment_options(
target_action = target_actions[target_mode]
composite_parser.add_argument(
register_completer(composite_parser.add_argument(
target_option,
metavar='OPT',
action=register_action_type(target_action),
help=target_help,
).completer = completer.completer
), completer.completer)
return action_types
@ -246,9 +247,8 @@ def add_legacy_environment_options(
target_mode, # type: TargetMode
):
"""Add legacy options for controlling the test environment."""
# noinspection PyTypeChecker
environment = parser.add_argument_group(
title='environment arguments (mutually exclusive with "composite environment arguments" below)') # type: argparse.ArgumentParser
environment: argparse.ArgumentParser = parser.add_argument_group( # type: ignore[assignment] # real type private
title='environment arguments (mutually exclusive with "composite environment arguments" below)')
add_environments_python(environment, target_mode)
add_environments_host(environment, controller_mode, target_mode)
@ -259,6 +259,8 @@ def add_environments_python(
target_mode, # type: TargetMode
): # type: (...) -> None
"""Add environment arguments to control the Python version(s) used."""
python_versions: t.Tuple[str, ...]
if target_mode.has_python:
python_versions = SUPPORTED_PYTHON_VERSIONS
else:
@ -284,8 +286,7 @@ def add_environments_host(
target_mode # type: TargetMode
): # type: (...) -> None
"""Add environment arguments for the given host and argument modes."""
# noinspection PyTypeChecker
environments_exclusive_group = environments_parser.add_mutually_exclusive_group() # type: argparse.ArgumentParser
environments_exclusive_group: argparse.ArgumentParser = environments_parser.add_mutually_exclusive_group() # type: ignore[assignment] # real type private
add_environment_local(environments_exclusive_group)
add_environment_venv(environments_exclusive_group, environments_parser)
@ -305,28 +306,28 @@ def add_environment_network(
environments_parser, # type: argparse.ArgumentParser
): # type: (...) -> None
"""Add environment arguments for running on a windows host."""
environments_parser.add_argument(
register_completer(environments_parser.add_argument(
'--platform',
metavar='PLATFORM',
action='append',
help='network platform/version',
).completer = complete_network_platform
), complete_network_platform)
environments_parser.add_argument(
register_completer(environments_parser.add_argument(
'--platform-collection',
type=key_value_type,
metavar='PLATFORM=COLLECTION',
action='append',
help='collection used to test platform',
).completer = complete_network_platform_collection
), complete_network_platform_collection)
environments_parser.add_argument(
register_completer(environments_parser.add_argument(
'--platform-connection',
type=key_value_type,
metavar='PLATFORM=CONNECTION',
action='append',
help='connection used to test platform',
).completer = complete_network_platform_connection
), complete_network_platform_connection)
environments_parser.add_argument(
'--inventory',
@ -339,12 +340,12 @@ def add_environment_windows(
environments_parser, # type: argparse.ArgumentParser
): # type: (...) -> None
"""Add environment arguments for running on a windows host."""
environments_parser.add_argument(
register_completer(environments_parser.add_argument(
'--windows',
metavar='VERSION',
action='append',
help='windows version',
).completer = complete_windows
), complete_windows)
environments_parser.add_argument(
'--inventory',
@ -435,13 +436,13 @@ def add_environment_docker(
else:
docker_images = sorted(filter_completion(docker_completion(), controller_only=True))
exclusive_parser.add_argument(
register_completer(exclusive_parser.add_argument(
'--docker',
metavar='IMAGE',
nargs='?',
const='default',
help='run from a docker container',
).completer = functools.partial(complete_choices, docker_images)
), functools.partial(complete_choices, docker_images))
environments_parser.add_argument(
'--docker-privileged',
@ -480,12 +481,12 @@ def add_global_remote(
suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS
parser.add_argument(
register_completer(parser.add_argument(
'--remote-stage',
metavar='STAGE',
default='prod',
help=suppress or 'remote stage to use: prod, dev',
).completer = complete_remote_stage
), complete_remote_stage)
parser.add_argument(
'--remote-endpoint',
@ -518,11 +519,11 @@ def add_environment_remote(
suppress = None if get_ci_provider().supports_core_ci_auth() else argparse.SUPPRESS
exclusive_parser.add_argument(
register_completer(exclusive_parser.add_argument(
'--remote',
metavar='NAME',
help=suppress or 'run from a remote instance',
).completer = functools.partial(complete_choices, remote_platforms)
), functools.partial(complete_choices, remote_platforms))
environments_parser.add_argument(
'--remote-provider',

@ -142,7 +142,7 @@ class WindowsTargetParser(TargetsNamespaceParser, TypeParser):
def get_internal_parsers(self, targets): # type: (t.List[WindowsConfig]) -> t.Dict[str, Parser]
"""Return a dictionary of type names and type parsers."""
parsers = {}
parsers = {} # type: t.Dict[str, Parser]
if self.allow_inventory and not targets:
parsers.update(
@ -184,7 +184,7 @@ class NetworkTargetParser(TargetsNamespaceParser, TypeParser):
def get_internal_parsers(self, targets): # type: (t.List[NetworkConfig]) -> t.Dict[str, Parser]
"""Return a dictionary of type names and type parsers."""
parsers = {}
parsers = {} # type: t.Dict[str, Parser]
if self.allow_inventory and not targets:
parsers.update(

@ -27,7 +27,7 @@ def get_docker_pythons(name, controller, strict): # type: (str, bool, bool) ->
available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS
if not image_config:
return [] if strict else available_pythons
return [] if strict else list(available_pythons)
supported_pythons = [python for python in image_config.supported_pythons if python in available_pythons]
@ -40,7 +40,7 @@ def get_remote_pythons(name, controller, strict): # type: (str, bool, bool) ->
available_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS
if not platform_config:
return [] if strict else available_pythons
return [] if strict else list(available_pythons)
supported_pythons = [python for python in platform_config.supported_pythons if python in available_pythons]
@ -54,6 +54,6 @@ def get_controller_pythons(controller_config, strict): # type: (HostConfig, boo
elif isinstance(controller_config, PosixRemoteConfig):
pythons = get_remote_pythons(controller_config.name, False, strict)
else:
pythons = SUPPORTED_PYTHON_VERSIONS
pythons = list(SUPPORTED_PYTHON_VERSIONS)
return pythons

@ -5,6 +5,7 @@ import typing as t
from ...host_configs import (
NativePythonConfig,
PythonConfig,
VirtualPythonConfig,
)
@ -18,6 +19,7 @@ from ..argparsing.parsers import (
Parser,
ParserError,
ParserState,
ParserBoundary,
)
@ -58,7 +60,7 @@ class PythonParser(Parser):
The origin host and unknown environments assume all relevant Python versions are available.
"""
def __init__(self,
versions, # type: t.List[str]
versions, # type: t.Sequence[str]
*,
allow_default, # type: bool
allow_venv, # type: bool
@ -85,9 +87,13 @@ class PythonParser(Parser):
def parse(self, state): # type: (ParserState) -> t.Any
"""Parse the input from the given state and return the result."""
boundary: ParserBoundary
with state.delimit('@/', required=False) as boundary:
version = ChoicesParser(self.first_choices).parse(state)
python: PythonConfig
if version == 'venv':
with state.delimit('@/', required=False) as boundary:
version = ChoicesParser(self.venv_choices).parse(state)
@ -156,7 +162,7 @@ class SshConnectionParser(Parser):
setattr(namespace, 'user', user)
with state.delimit(':', required=False) as colon:
with state.delimit(':', required=False) as colon: # type: ParserBoundary
host = AnyParser(no_match_message=f'Expected {{host}} from: {self.EXPECTED_FORMAT}').parse(state)
setattr(namespace, 'host', host)

@ -298,7 +298,7 @@ class PathChecker:
def __init__(self, args, collection_search_re=None): # type: (CoverageConfig, t.Optional[t.Pattern]) -> None
self.args = args
self.collection_search_re = collection_search_re
self.invalid_paths = []
self.invalid_paths = [] # type: t.List[str]
self.invalid_path_chars = 0
def check_path(self, path): # type: (str) -> bool

@ -38,7 +38,7 @@ class CoverageAnalyzeTargetsConfig(CoverageAnalyzeConfig):
def make_report(target_indexes, arcs, lines): # type: (TargetIndexes, Arcs, Lines) -> t.Dict[str, t.Any]
"""Condense target indexes, arcs and lines into a compact report."""
set_indexes = {}
set_indexes = {} # type: TargetSetIndexes
arc_refs = dict((path, dict((format_arc(arc), get_target_set_index(indexes, set_indexes)) for arc, indexes in data.items())) for path, data in arcs.items())
line_refs = dict((path, dict((line, get_target_set_index(indexes, set_indexes)) for line, indexes in data.items())) for path, data in lines.items())
@ -95,6 +95,11 @@ def write_report(args, report, path): # type: (CoverageAnalyzeTargetsConfig, t.
), verbosity=1)
def format_line(value): # type: (int) -> str
"""Format line as a string."""
return str(value) # putting this in a function keeps both pylint and mypy happy
def format_arc(value): # type: (t.Tuple[int, int]) -> str
"""Format an arc tuple as a string."""
return '%d:%d' % value

@ -19,6 +19,7 @@ from . import (
CoverageAnalyzeTargetsConfig,
expand_indexes,
format_arc,
format_line,
read_report,
)
@ -43,7 +44,7 @@ def command_coverage_analyze_targets_expand(args): # type: (CoverageAnalyzeTarg
report = dict(
arcs=expand_indexes(covered_path_arcs, covered_targets, format_arc),
lines=expand_indexes(covered_path_lines, covered_targets, str),
lines=expand_indexes(covered_path_lines, covered_targets, format_line),
)
if not args.explain:

@ -68,7 +68,7 @@ def command_coverage_analyze_targets_generate(args): # type: (CoverageAnalyzeTa
raise Delegate(host_state)
root = data_context().content.root
target_indexes = {}
target_indexes = {} # type: TargetIndexes
arcs = dict((os.path.relpath(path, root), data) for path, data in analyze_python_coverage(args, host_state, args.input_dir, target_indexes).items())
lines = dict((os.path.relpath(path, root), data) for path, data in analyze_powershell_coverage(args, args.input_dir, target_indexes).items())
report = make_report(target_indexes, arcs, lines)
@ -139,7 +139,7 @@ def analyze_powershell_coverage(
def prune_invalid_filenames(
args, # type: CoverageAnalyzeTargetsGenerateConfig
results, # type: t.Dict[str, t.Any]
collection_search_re=None, # type: t.Optional[str]
collection_search_re=None, # type: t.Optional[t.Pattern]
): # type: (...) -> None
"""Remove invalid filenames from the given result set."""
path_checker = PathChecker(args, collection_search_re)

@ -53,7 +53,7 @@ def command_coverage_analyze_targets_missing(args): # type: (CoverageAnalyzeTar
from_targets, from_path_arcs, from_path_lines = read_report(args.from_file)
to_targets, to_path_arcs, to_path_lines = read_report(args.to_file)
target_indexes = {}
target_indexes = {} # type: TargetIndexes
if args.only_gaps:
arcs = find_gaps(from_path_arcs, from_targets, to_path_arcs, target_indexes, args.only_exists)
@ -74,7 +74,7 @@ def find_gaps(
only_exists, # type: bool
): # type: (...) -> IndexedPoints
"""Find gaps in coverage between the from and to data sets."""
target_data = {}
target_data = {} # type: IndexedPoints
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):
@ -100,7 +100,7 @@ def find_missing(
only_exists, # type: bool
): # type: (...) -> IndexedPoints
"""Find coverage in from_data not present in to_data (arcs or lines)."""
target_data = {}
target_data = {} # type: IndexedPoints
for from_path, from_points in from_data.items():
if only_exists and not os.path.isfile(to_bytes(from_path)):

@ -315,7 +315,6 @@ def get_coverage_group(args, coverage_file): # type: (CoverageCombineConfig, st
"""Return the name of the coverage group for the specified coverage file, or None if no group was found."""
parts = os.path.basename(coverage_file).split('=', 4)
# noinspection PyTypeChecker
if len(parts) != 5 or not parts[4].startswith('coverage.'):
return None

@ -76,7 +76,7 @@ def _generate_powershell_xml(coverage_file): # type: (str) -> Element
content_root = data_context().content.root
is_ansible = data_context().content.is_ansible
packages = {}
packages = {} # type: t.Dict[str, t.Dict[str, t.Dict[str, int]]]
for path, results in coverage_info.items():
filename = os.path.splitext(os.path.basename(path))[0]

@ -166,7 +166,7 @@ def show_dict(data, verbose, root_verbosity=0, path=None): # type: (t.Dict[str,
display.info(indent + '%s: %s' % (key, value), verbosity=verbosity)
def get_docker_details(args): # type: (EnvConfig) -> t.Dict[str, str]
def get_docker_details(args): # type: (EnvConfig) -> t.Dict[str, t.Any]
"""Return details about docker."""
docker = get_docker_command()

@ -133,7 +133,7 @@ def generate_dependency_map(integration_targets): # type: (t.List[IntegrationTa
"""Analyze the given list of integration test targets and return a dictionary expressing target names and the targets on which they depend."""
targets_dict = dict((target.name, target) for target in integration_targets)
target_dependencies = analyze_integration_target_dependencies(integration_targets)
dependency_map = {}
dependency_map = {} # type: t.Dict[str, t.Set[IntegrationTarget]]
invalid_targets = set()
@ -158,7 +158,7 @@ def generate_dependency_map(integration_targets): # type: (t.List[IntegrationTa
def get_files_needed(target_dependencies): # type: (t.List[IntegrationTarget]) -> t.List[str]
"""Return a list of files needed by the given list of target dependencies."""
files_needed = []
files_needed = [] # type: t.List[str]
for target_dependency in target_dependencies:
files_needed += target_dependency.needs_file
@ -228,7 +228,7 @@ def integration_test_environment(
args, # type: IntegrationConfig
target, # type: IntegrationTarget
inventory_path_src, # type: str
): # type: (...) -> t.ContextManager[IntegrationEnvironment]
): # type: (...) -> t.Iterator[IntegrationEnvironment]
"""Context manager that prepares the integration test environment and cleans it up."""
ansible_config_src = args.get_ansible_config()
ansible_config_relative = os.path.join(data_context().content.integration_path, '%s.cfg' % args.command)
@ -311,8 +311,7 @@ def integration_test_environment(
display.info('Copying %s/ to %s/' % (dir_src, dir_dst), verbosity=2)
if not args.explain:
# noinspection PyTypeChecker
shutil.copytree(to_bytes(dir_src), to_bytes(dir_dst), symlinks=True)
shutil.copytree(to_bytes(dir_src), to_bytes(dir_dst), symlinks=True) # type: ignore[arg-type] # incorrect type stub omits bytes path support
for file_src, file_dst in file_copies:
display.info('Copying %s to %s' % (file_src, file_dst), verbosity=2)
@ -332,7 +331,7 @@ def integration_test_config_file(
args, # type: IntegrationConfig
env_config, # type: CloudEnvironmentConfig
integration_dir, # type: str
): # type: (...) -> t.ContextManager[t.Optional[str]]
): # type: (...) -> t.Iterator[t.Optional[str]]
"""Context manager that provides a config file for integration tests, if needed."""
if not env_config:
yield None
@ -349,7 +348,7 @@ def integration_test_config_file(
config_file = json.dumps(config_vars, indent=4, sort_keys=True)
with named_temporary_file(args, 'config-file-', '.json', integration_dir, config_file) as path:
with named_temporary_file(args, 'config-file-', '.json', integration_dir, config_file) as path: # type: str
filename = os.path.relpath(path, integration_dir)
display.info('>>> Config File: %s\n%s' % (filename, config_file), verbosity=3)
@ -386,8 +385,8 @@ def create_inventory(
def command_integration_filtered(
args, # type: IntegrationConfig
host_state, # type: HostState
targets, # type: t.Tuple[IntegrationTarget]
all_targets, # type: t.Tuple[IntegrationTarget]
targets, # type: t.Tuple[IntegrationTarget, ...]
all_targets, # type: t.Tuple[IntegrationTarget, ...]
inventory_path, # type: str
pre_target=None, # type: t.Optional[t.Callable[[IntegrationTarget], None]]
post_target=None, # type: t.Optional[t.Callable[[IntegrationTarget], None]]
@ -401,7 +400,7 @@ def command_integration_filtered(
all_targets_dict = dict((target.name, target) for target in all_targets)
setup_errors = []
setup_targets_executed = set()
setup_targets_executed = set() # type: t.Set[str]
for target in all_targets:
for setup_target in target.setup_once + target.setup_always:
@ -526,7 +525,7 @@ def command_integration_filtered(
failed.append(target)
if args.continue_on_error:
display.error(ex)
display.error(str(ex))
continue
display.notice('To resume at this test target, use the option: --start-at %s' % target.name)
@ -585,7 +584,7 @@ def command_integration_script(
module_defaults=env_config.module_defaults,
), indent=4, sort_keys=True), verbosity=3)
with integration_test_environment(args, target, inventory_path) as test_env:
with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment
cmd = ['./%s' % os.path.basename(target.script_path)]
if args.verbosity:
@ -602,7 +601,7 @@ def command_integration_script(
if env_config and env_config.env_vars:
env.update(env_config.env_vars)
with integration_test_config_file(args, env_config, test_env.integration_dir) as config_path:
with integration_test_config_file(args, env_config, test_env.integration_dir) as config_path: # type: t.Optional[str]
if config_path:
cmd += ['-e', '@%s' % config_path]
@ -661,7 +660,7 @@ def command_integration_role(
module_defaults=env_config.module_defaults,
), indent=4, sort_keys=True), verbosity=3)
with integration_test_environment(args, target, inventory_path) as test_env:
with integration_test_environment(args, target, inventory_path) as test_env: # type: IntegrationEnvironment
if os.path.exists(test_env.vars_file):
vars_files.append(os.path.relpath(test_env.vars_file, test_env.integration_dir))
@ -733,7 +732,7 @@ def run_setup_targets(
args, # type: IntegrationConfig
host_state, # type: HostState
test_dir, # type: str
target_names, # type: t.List[str]
target_names, # type: t.Sequence[str]
targets_dict, # type: t.Dict[str, IntegrationTarget]
targets_executed, # type: t.Set[str]
inventory_path, # type: str

@ -59,8 +59,8 @@ def get_cloud_plugins(): # type: () -> t.Tuple[t.Dict[str, t.Type[CloudProvider
"""Import cloud plugins and load them into the plugin dictionaries."""
import_plugins('commands/integration/cloud')
providers = {}
environments = {}
providers = {} # type: t.Dict[str, t.Type[CloudProvider]]
environments = {} # type: t.Dict[str, t.Type[CloudEnvironment]]
load_plugins(CloudProvider, providers)
load_plugins(CloudEnvironment, environments)
@ -134,7 +134,7 @@ def cloud_filter(args, targets): # type: (IntegrationConfig, t.Tuple[Integratio
if args.metadata.cloud_config is not None:
return [] # cloud filter already performed prior to delegation
exclude = []
exclude = [] # type: t.List[str]
for provider in get_cloud_providers(args, targets):
provider.filter(targets, exclude)
@ -206,7 +206,7 @@ class CloudBase(metaclass=abc.ABCMeta):
@property
def setup_executed(self): # type: () -> bool
"""True if setup has been executed, otherwise False."""
return self._get_cloud_config(self._SETUP_EXECUTED, False)
return t.cast(bool, self._get_cloud_config(self._SETUP_EXECUTED, False))
@setup_executed.setter
def setup_executed(self, value): # type: (bool) -> None
@ -216,7 +216,7 @@ class CloudBase(metaclass=abc.ABCMeta):
@property
def config_path(self): # type: () -> str
"""Path to the configuration file."""
return os.path.join(data_context().content.root, self._get_cloud_config(self._CONFIG_PATH))
return os.path.join(data_context().content.root, str(self._get_cloud_config(self._CONFIG_PATH)))
@config_path.setter
def config_path(self, value): # type: (str) -> None
@ -226,7 +226,7 @@ class CloudBase(metaclass=abc.ABCMeta):
@property
def resource_prefix(self): # type: () -> str
"""Resource prefix."""
return self._get_cloud_config(self._RESOURCE_PREFIX)
return str(self._get_cloud_config(self._RESOURCE_PREFIX))
@resource_prefix.setter
def resource_prefix(self, value): # type: (str) -> None
@ -236,7 +236,7 @@ class CloudBase(metaclass=abc.ABCMeta):
@property
def managed(self): # type: () -> bool
"""True if resources are managed by ansible-test, otherwise False."""
return self._get_cloud_config(self._MANAGED)
return t.cast(bool, self._get_cloud_config(self._MANAGED))
@managed.setter
def managed(self, value): # type: (bool) -> None

@ -104,9 +104,8 @@ class AwsCloudEnvironment(CloudEnvironment):
ansible_vars = dict(
resource_prefix=self.resource_prefix,
tiny_prefix=uuid.uuid4().hex[0:12]
)
) # type: t.Dict[str, t.Any]
# noinspection PyTypeChecker
ansible_vars.update(dict(parser.items('default')))
display.sensitive.add(ansible_vars.get('aws_secret_key'))

@ -33,7 +33,7 @@ class AzureCloudProvider(CloudProvider):
def __init__(self, args): # type: (IntegrationConfig) -> None
super().__init__(args)
self.aci = None
self.aci = None # type: t.Optional[AnsibleCoreCI]
self.uses_config = True

@ -85,8 +85,8 @@ class ForemanEnvironment(CloudEnvironment):
def get_environment_config(self): # type: () -> CloudEnvironmentConfig
"""Return environment configuration for use in the test environment after delegation."""
env_vars = dict(
FOREMAN_HOST=self._get_cloud_config('FOREMAN_HOST'),
FOREMAN_PORT=self._get_cloud_config('FOREMAN_PORT'),
FOREMAN_HOST=str(self._get_cloud_config('FOREMAN_HOST')),
FOREMAN_PORT=str(self._get_cloud_config('FOREMAN_PORT')),
)
return CloudEnvironmentConfig(

@ -145,8 +145,8 @@ class GalaxyEnvironment(CloudEnvironment):
"""Galaxy environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self): # type: () -> CloudEnvironmentConfig
"""Return environment configuration for use in the test environment after delegation."""
pulp_user = self._get_cloud_config('PULP_USER')
pulp_password = self._get_cloud_config('PULP_PASSWORD')
pulp_user = str(self._get_cloud_config('PULP_USER'))
pulp_password = str(self._get_cloud_config('PULP_PASSWORD'))
pulp_host = self._get_cloud_config('PULP_HOST')
galaxy_port = self._get_cloud_config('GALAXY_PORT')
pulp_port = self._get_cloud_config('PULP_PORT')

@ -87,6 +87,6 @@ class HttptesterEnvironment(CloudEnvironment):
return CloudEnvironmentConfig(
env_vars=dict(
HTTPTESTER='1', # backwards compatibility for tests intended to work with or without HTTP Tester
KRB5_PASSWORD=self._get_cloud_config(KRB5_PASSWORD_ENV),
KRB5_PASSWORD=str(self._get_cloud_config(KRB5_PASSWORD_ENV)),
)
)

@ -107,14 +107,14 @@ class VcenterEnvironment(CloudEnvironment):
ansible_vars.update(dict(parser.items('DEFAULT', raw=True)))
except KeyError: # govcsim
env_vars = dict(
VCENTER_HOSTNAME=self._get_cloud_config('vcenter_hostname'),
VCENTER_HOSTNAME=str(self._get_cloud_config('vcenter_hostname')),
VCENTER_USERNAME='user',
VCENTER_PASSWORD='pass',
)
ansible_vars = dict(
vcsim=self._get_cloud_config('vcenter_hostname'),
vcenter_hostname=self._get_cloud_config('vcenter_hostname'),
vcsim=str(self._get_cloud_config('vcenter_hostname')),
vcenter_hostname=str(self._get_cloud_config('vcenter_hostname')),
vcenter_username='user',
vcenter_password='pass',
)

@ -271,7 +271,7 @@ class WindowsCoverageHandler(CoverageHandler[WindowsConfig]):
@property
def is_active(self): # type: () -> bool
"""True if the handler should be used, otherwise False."""
return self.profiles and not self.args.coverage_check
return bool(self.profiles) and not self.args.coverage_check
def setup(self): # type: () -> None
"""Perform setup for code coverage."""

@ -142,7 +142,7 @@ def command_sanity(args): # type: (SanityConfig) -> None
if not targets.include:
raise AllTargetsSkipped()
tests = sanity_get_tests()
tests = list(sanity_get_tests())
if args.test:
disabled = []
@ -170,6 +170,8 @@ def command_sanity(args): # type: (SanityConfig) -> None
total = 0
failed = []
result: t.Optional[TestResult]
for test in tests:
if args.list_tests:
display.info(test.name)
@ -201,14 +203,14 @@ def command_sanity(args): # type: (SanityConfig) -> None
else:
raise Exception('Unsupported test type: %s' % type(test))
all_targets = targets.targets
all_targets = list(targets.targets)
if test.all_targets:
usable_targets = targets.targets
usable_targets = list(targets.targets)
elif test.no_targets:
usable_targets = tuple()
usable_targets = []
else:
usable_targets = targets.include
usable_targets = list(targets.include)
all_targets = SanityTargets.filter_and_inject_targets(test, all_targets)
usable_targets = SanityTargets.filter_and_inject_targets(test, usable_targets)
@ -503,12 +505,15 @@ class SanityIgnoreParser:
def load(args): # type: (SanityConfig) -> SanityIgnoreParser
"""Return the current SanityIgnore instance, initializing it if needed."""
try:
return SanityIgnoreParser.instance
return SanityIgnoreParser.instance # type: ignore[attr-defined]
except AttributeError:
pass
SanityIgnoreParser.instance = SanityIgnoreParser(args)
return SanityIgnoreParser.instance
instance = SanityIgnoreParser(args)
SanityIgnoreParser.instance = instance # type: ignore[attr-defined]
return instance
class SanityIgnoreProcessor:
@ -571,7 +576,7 @@ class SanityIgnoreProcessor:
def get_errors(self, paths): # type: (t.List[str]) -> t.List[SanityMessage]
"""Return error messages related to issues with the file."""
messages = []
messages = [] # type: t.List[SanityMessage]
# unused errors
@ -621,7 +626,7 @@ class SanityFailure(TestFailure):
self,
test, # type: str
python_version=None, # type: t.Optional[str]
messages=None, # type: t.Optional[t.List[SanityMessage]]
messages=None, # type: t.Optional[t.Sequence[SanityMessage]]
summary=None, # type: t.Optional[str]
): # type: (...) -> None
super().__init__(COMMAND, test, python_version, messages, summary)
@ -633,7 +638,7 @@ class SanityMessage(TestMessage):
class SanityTargets:
"""Sanity test target information."""
def __init__(self, targets, include): # type: (t.Tuple[TestTarget], t.Tuple[TestTarget]) -> None
def __init__(self, targets, include): # type: (t.Tuple[TestTarget, ...], t.Tuple[TestTarget, ...]) -> None
self.targets = targets
self.include = include
@ -671,11 +676,13 @@ class SanityTargets:
def get_targets(): # type: () -> t.Tuple[TestTarget, ...]
"""Return a tuple of sanity test targets. Uses a cached version when available."""
try:
return SanityTargets.get_targets.targets
return SanityTargets.get_targets.targets # type: ignore[attr-defined]
except AttributeError:
SanityTargets.get_targets.targets = tuple(sorted(walk_sanity_targets()))
targets = tuple(sorted(walk_sanity_targets()))
return SanityTargets.get_targets.targets
SanityTargets.get_targets.targets = targets # type: ignore[attr-defined]
return targets
class SanityTest(metaclass=abc.ABCMeta):
@ -695,7 +702,7 @@ class SanityTest(metaclass=abc.ABCMeta):
# Because these errors can be unpredictable they behave differently than normal error codes:
# * They are not reported by default. The `--enable-optional-errors` option must be used to display these errors.
# * They cannot be ignored. This is done to maintain the integrity of the ignore system.
self.optional_error_codes = set()
self.optional_error_codes = set() # type: t.Set[str]
@property
def error_code(self): # type: () -> t.Optional[str]
@ -954,7 +961,7 @@ class SanityCodeSmellTest(SanitySingleVersion):
elif self.output == 'path-message':
pattern = '^(?P<path>[^:]*): (?P<message>.*)$'
else:
pattern = ApplicationError('Unsupported output type: %s' % self.output)
raise ApplicationError('Unsupported output type: %s' % self.output)
if not self.no_targets:
data = '\n'.join(paths)

@ -11,6 +11,7 @@ from . import (
SanityFailure,
SanitySuccess,
SanityTargets,
SanityMessage,
)
from ...test import (
@ -77,8 +78,8 @@ class AnsibleDocTest(SanitySingleVersion):
paths = [target.path for target in targets.include]
doc_targets = collections.defaultdict(list)
target_paths = collections.defaultdict(dict)
doc_targets = collections.defaultdict(list) # type: t.Dict[str, t.List[str]]
target_paths = collections.defaultdict(dict) # type: t.Dict[str, t.Dict[str, str]]
remap_types = dict(
modules='module',
@ -97,7 +98,7 @@ class AnsibleDocTest(SanitySingleVersion):
target_paths[plugin_type][data_context().content.prefix + plugin_name] = plugin_file_path
env = ansible_environment(args, color=False)
error_messages = []
error_messages = [] # type: t.List[SanityMessage]
for doc_type in sorted(doc_targets):
for format_option in [None, '--json']:

@ -2,6 +2,7 @@
from __future__ import annotations
import os
import typing as t
from . import (
SanityFailure,
@ -38,7 +39,7 @@ class IgnoresTest(SanityVersionNeutral):
def test(self, args, targets): # type: (SanityConfig, SanityTargets) -> TestResult
sanity_ignore = SanityIgnoreParser.load(args)
messages = []
messages = [] # type: t.List[SanityMessage]
# parse errors

@ -115,7 +115,7 @@ class ImportTest(SanityMultipleVersion):
try:
install_requirements(args, python, virtualenv=True, controller=False) # sanity (import)
except PipUnavailableError as ex:
display.warning(ex)
display.warning(str(ex))
temp_root = os.path.join(ResultType.TMP.path, 'sanity', 'import')

@ -1,6 +1,7 @@
"""Sanity test to check integration test aliases."""
from __future__ import annotations
import dataclasses
import json
import textwrap
import os
@ -127,7 +128,7 @@ class IntegrationAliasesTest(SanitySingleVersion):
def ci_test_groups(self): # type: () -> t.Dict[str, t.List[int]]
"""Return a dictionary of CI test names and their group(s)."""
if not self._ci_test_groups:
test_groups = {}
test_groups = {} # type: t.Dict[str, t.Set[int]]
for stage in self._ci_config['stages']:
for job in stage['jobs']:
@ -209,7 +210,7 @@ class IntegrationAliasesTest(SanitySingleVersion):
path=self.CI_YML,
)])
results = dict(
results = Results(
comments=[],
labels={},
)
@ -217,7 +218,7 @@ class IntegrationAliasesTest(SanitySingleVersion):
self.load_ci_config(python)
self.check_changes(args, results)
write_json_test_results(ResultType.BOT, 'data-sanity-ci.json', results)
write_json_test_results(ResultType.BOT, 'data-sanity-ci.json', results.__dict__)
messages = []
@ -324,8 +325,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
return messages
def check_changes(self, args, results): # type: (SanityConfig, t.Dict[str, t.Any]) -> None
"""Check changes and store results in the provided results dictionary."""
def check_changes(self, args, results): # type: (SanityConfig, Results) -> None
"""Check changes and store results in the provided result dictionary."""
integration_targets = list(walk_integration_targets())
module_targets = list(walk_module_targets())
@ -369,8 +370,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
unsupported_tests=bool(unsupported_targets),
)
results['comments'] += comments
results['labels'].update(labels)
results.comments += comments
results.labels.update(labels)
def format_comment(self, template, targets): # type: (str, t.List[str]) -> t.Optional[str]
"""Format and return a comment based on the given template and targets, or None if there are no targets."""
@ -387,3 +388,10 @@ class IntegrationAliasesTest(SanitySingleVersion):
message = textwrap.dedent(template).strip().format(**data)
return message
@dataclasses.dataclass
class Results:
"""Check results."""
comments: t.List[str]
labels: t.Dict[str, bool]

@ -92,7 +92,7 @@ class Pep8Test(SanitySingleVersion):
else:
results = []
results = [SanityMessage(
messages = [SanityMessage(
message=r['message'],
path=r['path'],
line=int(r['line']),
@ -101,7 +101,7 @@ class Pep8Test(SanitySingleVersion):
code=r['code'],
) for r in results]
errors = settings.process_errors(results, paths)
errors = settings.process_errors(messages, paths)
if errors:
return SanityFailure(self.name, messages=errors)

@ -18,6 +18,7 @@ from ...executor import (
)
from ...connections import (
Connection,
LocalConnection,
SshConnection,
)
@ -55,13 +56,13 @@ def command_shell(args): # type: (ShellConfig) -> None
if isinstance(target_profile, ControllerProfile):
# run the shell locally unless a target was requested
con = LocalConnection(args)
con = LocalConnection(args) # type: Connection
else:
# a target was requested, connect to it over SSH
con = target_profile.get_controller_target_connections()[0]
if isinstance(con, SshConnection) and args.raw:
cmd = []
cmd = [] # type: t.List[str]
elif isinstance(target_profile, PosixProfile):
cmd = []

@ -291,9 +291,9 @@ def get_units_ansible_python_path(args, test_context): # type: (UnitsConfig, st
return get_ansible_python_path(args)
try:
cache = get_units_ansible_python_path.cache
cache = get_units_ansible_python_path.cache # type: ignore[attr-defined]
except AttributeError:
cache = get_units_ansible_python_path.cache = {}
cache = get_units_ansible_python_path.cache = {} # type: ignore[attr-defined]
python_path = cache.get(test_context)

@ -1,14 +1,16 @@
"""Packaging compatibility."""
from __future__ import annotations
import typing as t
try:
from packaging import (
specifiers,
version,
)
SpecifierSet = specifiers.SpecifierSet
Version = version.Version
SpecifierSet = specifiers.SpecifierSet # type: t.Optional[t.Type[specifiers.SpecifierSet]]
Version = version.Version # type: t.Optional[t.Type[version.Version]]
PACKAGING_IMPORT_ERROR = None
except ImportError as ex:
SpecifierSet = None # pylint: disable=invalid-name

@ -1,6 +1,8 @@
"""PyYAML compatibility."""
from __future__ import annotations
import typing as t
from functools import (
partial,
)
@ -13,7 +15,7 @@ except ImportError as ex:
YAML_IMPORT_ERROR = ex
else:
try:
_SafeLoader = _yaml.CSafeLoader
_SafeLoader = _yaml.CSafeLoader # type: t.Union[t.Type[_yaml.CSafeLoader], t.Type[_yaml.SafeLoader]]
except AttributeError:
_SafeLoader = _yaml.SafeLoader

@ -211,9 +211,9 @@ def filter_completion(
controller_only=False, # type: bool
include_defaults=False, # type: bool
): # type: (...) -> t.Dict[str, TCompletionConfig]
"""Return a the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified."""
"""Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified."""
if controller_only:
completion = {name: config for name, config in completion.items() if config.controller_supported}
completion = {name: config for name, config in completion.items() if isinstance(config, PosixCompletionConfig) and config.controller_supported}
if not include_defaults:
completion = {name: config for name, config in completion.items() if not config.is_default}

@ -10,6 +10,7 @@ from .util import (
display,
verify_sys_executable,
version_to_str,
type_guard,
)
from .util_common import (
@ -96,7 +97,7 @@ class EnvironmentConfig(CommonConfig):
not isinstance(self.controller, OriginConfig)
or isinstance(self.controller.python, VirtualPythonConfig)
or self.controller.python.version != version_to_str(sys.version_info[:2])
or verify_sys_executable(self.controller.python.path)
or bool(verify_sys_executable(self.controller.python.path))
)
self.docker_network = args.docker_network # type: t.Optional[str]
@ -161,16 +162,14 @@ class EnvironmentConfig(CommonConfig):
def only_targets(self, target_type): # type: (t.Type[THostConfig]) -> t.List[THostConfig]
"""
Return a list of target host configurations.
Requires that there are one or more targets, all of the specified type.
Requires that there are one or more targets, all the specified type.
"""
if not self.targets:
raise Exception('There must be one or more targets.')
for target in self.targets:
if not isinstance(target, target_type):
raise Exception(f'Target is {type(target_type)} instead of {target_type}.')
assert type_guard(self.targets, target_type)
return self.targets
return t.cast(t.List[THostConfig], self.targets)
@property
def target_type(self): # type: () -> t.Type[HostConfig]
@ -218,7 +217,7 @@ class TestConfig(EnvironmentConfig):
self.failure_ok = getattr(args, 'failure_ok', False) # type: bool
self.metadata = Metadata.from_file(args.metadata) if args.metadata else Metadata()
self.metadata_path = None
self.metadata_path = None # type: t.Optional[str]
if self.coverage_check:
self.coverage = True

@ -223,7 +223,7 @@ def run_support_container(
def get_container_database(args): # type: (EnvironmentConfig) -> ContainerDatabase
"""Return the current container database, creating it as needed, or returning the one provided on the command line through delegation."""
try:
return get_container_database.database
return get_container_database.database # type: ignore[attr-defined]
except AttributeError:
pass
@ -236,9 +236,9 @@ def get_container_database(args): # type: (EnvironmentConfig) -> ContainerDatab
display.info('>>> Container Database\n%s' % json.dumps(database.to_dict(), indent=4, sort_keys=True), verbosity=3)
get_container_database.database = database
get_container_database.database = database # type: ignore[attr-defined]
return get_container_database.database
return database
class ContainerAccess:
@ -457,7 +457,7 @@ class SupportContainerContext:
def support_container_context(
args, # type: EnvironmentConfig
ssh, # type: t.Optional[SshConnectionDetail]
): # type: (...) -> t.Optional[ContainerDatabase]
): # type: (...) -> t.Iterator[t.Optional[ContainerDatabase]]
"""Create a context manager for integration tests that use support containers."""
if not isinstance(args, (IntegrationConfig, UnitsConfig, SanityConfig, ShellConfig)):
yield None # containers are only needed for commands that have targets (hosts or pythons)
@ -514,7 +514,7 @@ def create_support_container_context(
try:
port_forwards = process.collect_port_forwards()
contexts = {}
contexts = {} # type: t.Dict[str, t.Dict[str, ContainerAccess]]
for forward, forwarded_port in port_forwards.items():
access_host, access_port = forward
@ -702,8 +702,8 @@ def create_container_hooks(
else:
managed_type = 'posix'
control_state = {}
managed_state = {}
control_state = {} # type: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]]
managed_state = {} # type: t.Dict[str, t.Tuple[t.List[str], t.List[SshProcess]]]
def pre_target(target):
"""Configure hosts for SSH port forwarding required by the specified target."""
@ -722,7 +722,7 @@ def create_container_hooks(
def create_managed_contexts(control_contexts): # type: (t.Dict[str, t.Dict[str, ContainerAccess]]) -> t.Dict[str, t.Dict[str, ContainerAccess]]
"""Create managed contexts from the given control contexts."""
managed_contexts = {}
managed_contexts = {} # type: t.Dict[str, t.Dict[str, ContainerAccess]]
for context_name, control_context in control_contexts.items():
managed_context = managed_contexts[context_name] = {}
@ -789,7 +789,7 @@ def forward_ssh_ports(
hosts_entries = create_hosts_entries(test_context)
inventory = generate_ssh_inventory(ssh_connections)
with named_temporary_file(args, 'ssh-inventory-', '.json', None, inventory) as inventory_path:
with named_temporary_file(args, 'ssh-inventory-', '.json', None, inventory) as inventory_path: # type: str
run_playbook(args, inventory_path, playbook, dict(hosts_entries=hosts_entries))
ssh_processes = [] # type: t.List[SshProcess]
@ -822,7 +822,7 @@ def cleanup_ssh_ports(
inventory = generate_ssh_inventory(ssh_connections)
with named_temporary_file(args, 'ssh-inventory-', '.json', None, inventory) as inventory_path:
with named_temporary_file(args, 'ssh-inventory-', '.json', None, inventory) as inventory_path: # type: str
run_playbook(args, inventory_path, playbook, dict(hosts_entries=hosts_entries))
if ssh_processes:

@ -107,7 +107,7 @@ class AnsibleCoreCI:
self._clear()
if self.instance_id:
self.started = True
self.started = True # type: bool
else:
self.started = False
self.instance_id = str(uuid.uuid4())

@ -110,7 +110,7 @@ def get_coverage_environment(
def get_coverage_config(args): # type: (TestConfig) -> str
"""Return the path to the coverage config, creating the config if it does not already exist."""
try:
return get_coverage_config.path
return get_coverage_config.path # type: ignore[attr-defined]
except AttributeError:
pass
@ -122,11 +122,13 @@ def get_coverage_config(args): # type: (TestConfig) -> str
temp_dir = tempfile.mkdtemp()
atexit.register(lambda: remove_tree(temp_dir))
path = get_coverage_config.path = os.path.join(temp_dir, COVERAGE_CONFIG_NAME)
path = os.path.join(temp_dir, COVERAGE_CONFIG_NAME)
if not args.explain:
write_text_file(path, coverage_config)
get_coverage_config.path = path # type: ignore[attr-defined]
return path

@ -129,7 +129,7 @@ class DataContext:
# Doing so allows support for older git versions for which it is difficult to distinguish between a super project and a sub project.
# It also provides a better user experience, since the solution for the user would effectively be the same -- to remove the nested version control.
if isinstance(layout_provider, UnsupportedLayout):
source_provider = UnsupportedSource(layout_provider.root)
source_provider = UnsupportedSource(layout_provider.root) # type: SourceProvider
else:
source_provider = find_path_provider(SourceProvider, source_providers, layout_provider.root, walk)
except ProviderNotFoundForPath:

@ -12,6 +12,7 @@ from .io import (
)
from .config import (
CommonConfig,
EnvironmentConfig,
IntegrationConfig,
SanityConfig,
@ -36,6 +37,7 @@ from .util_common import (
from .containers import (
support_container_context,
ContainerDatabase,
)
from .data import (
@ -68,7 +70,7 @@ from .provisioning import (
@contextlib.contextmanager
def delegation_context(args, host_state): # type: (EnvironmentConfig, HostState) -> None
def delegation_context(args, host_state): # type: (EnvironmentConfig, HostState) -> t.Iterator[None]
"""Context manager for serialized host state during delegation."""
make_dirs(ResultType.TMP.path)
@ -88,8 +90,10 @@ def delegation_context(args, host_state): # type: (EnvironmentConfig, HostState
args.host_path = None
def delegate(args, host_state, exclude, require): # type: (EnvironmentConfig, HostState, t.List[str], t.List[str]) -> None
def delegate(args, host_state, exclude, require): # type: (CommonConfig, HostState, t.List[str], t.List[str]) -> None
"""Delegate execution of ansible-test to another environment."""
assert isinstance(args, EnvironmentConfig)
with delegation_context(args, host_state):
if isinstance(args, TestConfig):
args.metadata.ci_provider = get_ci_provider().code
@ -142,7 +146,7 @@ def delegate_command(args, host_state, exclude, require): # type: (EnvironmentC
if not args.allow_destructive:
options.append('--allow-destructive')
with support_container_context(args, ssh) as containers:
with support_container_context(args, ssh) as containers: # type: t.Optional[ContainerDatabase]
if containers:
options.extend(['--containers', json.dumps(containers.to_dict())])

@ -154,7 +154,7 @@ def get_docker_preferred_network_name(args): # type: (EnvironmentConfig) -> str
- the default docker network (returns None)
"""
try:
return get_docker_preferred_network_name.network
return get_docker_preferred_network_name.network # type: ignore[attr-defined]
except AttributeError:
pass
@ -171,14 +171,14 @@ def get_docker_preferred_network_name(args): # type: (EnvironmentConfig) -> str
container = docker_inspect(args, current_container_id, always=True)
network = container.get_network_name()
get_docker_preferred_network_name.network = network
get_docker_preferred_network_name.network = network # type: ignore[attr-defined]
return network
def is_docker_user_defined_network(network): # type: (str) -> bool
"""Return True if the network being used is a user-defined network."""
return network and network != 'bridge'
return bool(network) and network != 'bridge'
def docker_pull(args, image): # type: (EnvironmentConfig, str) -> None
@ -247,7 +247,7 @@ def docker_run(
return stdout.strip()
except SubprocessError as ex:
display.error(ex)
display.error(ex.message)
display.warning('Failed to run docker image "%s". Waiting a few seconds before trying again.' % image)
time.sleep(3)
@ -265,7 +265,7 @@ def docker_start(args, container_id, options=None): # type: (EnvironmentConfig,
try:
return docker_command(args, ['start'] + options + [container_id], capture=True)
except SubprocessError as ex:
display.error(ex)
display.error(ex.message)
display.warning('Failed to start docker container "%s". Waiting a few seconds before trying again.' % container_id)
time.sleep(3)
@ -441,8 +441,8 @@ def docker_exec(
cmd, # type: t.List[str]
options=None, # type: t.Optional[t.List[str]]
capture=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Execute the given command in the specified container."""
@ -471,8 +471,8 @@ def docker_command(
args, # type: CommonConfig
cmd, # type: t.List[str]
capture=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
always=False, # type: bool
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]

@ -183,8 +183,10 @@ class PosixConfig(HostConfig, metaclass=abc.ABCMeta):
def get_defaults(self, context): # type: (HostContext) -> PosixCompletionConfig
"""Return the default settings."""
def apply_defaults(self, context, defaults): # type: (HostContext, PosixCompletionConfig) -> None
def apply_defaults(self, context, defaults): # type: (HostContext, CompletionConfig) -> None
"""Apply default settings."""
assert isinstance(defaults, PosixCompletionConfig)
super().apply_defaults(context, defaults)
self.python = self.python or NativePythonConfig()
@ -206,17 +208,19 @@ class RemoteConfig(HostConfig, metaclass=abc.ABCMeta):
provider: t.Optional[str] = None
@property
def platform(self):
def platform(self): # type: () -> str
"""The name of the platform."""
return self.name.partition('/')[0]
@property
def version(self):
def version(self): # type: () -> str
"""The version of the platform."""
return self.name.partition('/')[2]
def apply_defaults(self, context, defaults): # type: (HostContext, RemoteCompletionConfig) -> None
def apply_defaults(self, context, defaults): # type: (HostContext, CompletionConfig) -> None
"""Apply default settings."""
assert isinstance(defaults, RemoteCompletionConfig)
super().apply_defaults(context, defaults)
if self.provider == 'default':
@ -262,8 +266,9 @@ class InventoryConfig(HostConfig):
"""Return the default settings."""
return InventoryCompletionConfig()
def apply_defaults(self, context, defaults): # type: (HostContext, InventoryCompletionConfig) -> None
def apply_defaults(self, context, defaults): # type: (HostContext, CompletionConfig) -> None
"""Apply default settings."""
assert isinstance(defaults, InventoryCompletionConfig)
@dataclasses.dataclass
@ -293,8 +298,10 @@ class DockerConfig(ControllerHostConfig, PosixConfig):
return [ControllerConfig(python=NativePythonConfig(version=version, path=path)) for version, path in pythons.items()]
def apply_defaults(self, context, defaults): # type: (HostContext, DockerCompletionConfig) -> None
def apply_defaults(self, context, defaults): # type: (HostContext, CompletionConfig) -> None
"""Apply default settings."""
assert isinstance(defaults, DockerCompletionConfig)
super().apply_defaults(context, defaults)
self.name = defaults.name
@ -383,8 +390,10 @@ class NetworkRemoteConfig(RemoteConfig, NetworkConfig):
name=self.name,
)
def apply_defaults(self, context, defaults): # type: (HostContext, NetworkRemoteCompletionConfig) -> None
def apply_defaults(self, context, defaults): # type: (HostContext, CompletionConfig) -> None
"""Apply default settings."""
assert isinstance(defaults, NetworkRemoteCompletionConfig)
super().apply_defaults(context, defaults)
self.collection = self.collection or defaults.collection
@ -422,8 +431,10 @@ class ControllerConfig(PosixConfig):
"""Return the default settings."""
return context.controller_config.get_defaults(context)
def apply_defaults(self, context, defaults): # type: (HostContext, PosixCompletionConfig) -> None
def apply_defaults(self, context, defaults): # type: (HostContext, CompletionConfig) -> None
"""Apply default settings."""
assert isinstance(defaults, PosixCompletionConfig)
self.controller = context.controller_config
if not self.python and not defaults.supported_pythons:
@ -447,7 +458,7 @@ class ControllerConfig(PosixConfig):
class FallbackReason(enum.Enum):
"""Reason fallback was peformed."""
"""Reason fallback was performed."""
ENVIRONMENT = enum.auto()
PYTHON = enum.auto()

@ -96,6 +96,7 @@ from .connections import (
)
from .become import (
Become,
Su,
Sudo,
)
@ -109,11 +110,11 @@ TRemoteConfig = t.TypeVar('TRemoteConfig', bound=RemoteConfig)
@dataclasses.dataclass(frozen=True)
class Inventory:
"""Simple representation of an Ansible inventory."""
host_groups: t.Dict[str, t.Dict[str, t.Dict[str, str]]]
host_groups: t.Dict[str, t.Dict[str, t.Dict[str, t.Union[str, int]]]]
extra_groups: t.Optional[t.Dict[str, t.List[str]]] = None
@staticmethod
def create_single_host(name, variables): # type: (str, t.Dict[str, str]) -> Inventory
def create_single_host(name, variables): # type: (str, t.Dict[str, t.Union[str, int]]) -> Inventory
"""Return an inventory instance created from the given hostname and variables."""
return Inventory(host_groups=dict(all={name: variables}))
@ -448,7 +449,7 @@ class NetworkRemoteProfile(RemoteProfile[NetworkRemoteConfig]):
"""Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets."""
self.wait_until_ready()
def get_inventory_variables(self):
def get_inventory_variables(self): # type: () -> t.Dict[str, t.Optional[t.Union[str, int]]]
"""Return inventory variables for accessing this host."""
core_ci = self.wait_for_instance()
connection = core_ci.connection
@ -461,7 +462,7 @@ class NetworkRemoteProfile(RemoteProfile[NetworkRemoteConfig]):
ansible_user=connection.username,
ansible_ssh_private_key_file=core_ci.ssh_key.key,
ansible_network_os=f'{self.config.collection}.{self.config.platform}' if self.config.collection else self.config.platform,
)
) # type: t.Dict[str, t.Optional[t.Union[str, int]]]
return variables
@ -562,7 +563,7 @@ class PosixRemoteProfile(ControllerHostProfile[PosixRemoteConfig], RemoteProfile
)
if settings.user == 'root':
become = None
become = None # type: t.Optional[Become]
elif self.config.platform == 'freebsd':
become = Su()
elif self.config.platform == 'macos':
@ -672,7 +673,7 @@ class WindowsRemoteProfile(RemoteProfile[WindowsRemoteConfig]):
"""Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets."""
self.wait_until_ready()
def get_inventory_variables(self):
def get_inventory_variables(self): # type: () -> t.Dict[str, t.Optional[t.Union[str, int]]]
"""Return inventory variables for accessing this host."""
core_ci = self.wait_for_instance()
connection = core_ci.connection
@ -686,7 +687,7 @@ class WindowsRemoteProfile(RemoteProfile[WindowsRemoteConfig]):
ansible_user=connection.username,
ansible_password=connection.password,
ansible_ssh_private_key_file=core_ci.ssh_key.key,
)
) # type: t.Dict[str, t.Optional[t.Union[str, int]]]
# HACK: force 2016 to use NTLM + HTTP message encryption
if self.config.version == '2016':

@ -1,7 +1,6 @@
"""Early initialization for ansible-test before most other imports have been performed."""
from __future__ import annotations
# noinspection PyCompatibility
import resource
from .constants import (

@ -94,7 +94,7 @@ def create_network_inventory(args, path, target_hosts): # type: (EnvironmentCon
return
target_hosts = t.cast(t.List[NetworkRemoteProfile], target_hosts)
host_groups = {target_host.config.platform: {} for target_host in target_hosts}
host_groups = {target_host.config.platform: {} for target_host in target_hosts} # type: t.Dict[str, t.Dict[str, t.Dict[str, t.Union[str, int]]]]
for target_host in target_hosts:
host_groups[target_host.config.platform][sanitize_host_name(target_host.config.name)] = target_host.get_inventory_variables()
@ -149,7 +149,7 @@ def create_posix_inventory(args, path, target_hosts, needs_ssh=False): # type:
ansible_port=ssh.settings.port,
ansible_user=ssh.settings.user,
ansible_ssh_private_key_file=ssh.settings.identity_file,
)
) # type: t.Dict[str, t.Optional[t.Union[str, int]]]
if ssh.become:
testhost.update(

@ -14,17 +14,17 @@ from .encoding import (
)
def read_json_file(path): # type: (t.AnyStr) -> t.Any
def read_json_file(path): # type: (str) -> t.Any
"""Parse and return the json content from the specified path."""
return json.loads(read_text_file(path))
def read_text_file(path): # type: (t.AnyStr) -> t.Text
def read_text_file(path): # type: (str) -> t.Text
"""Return the contents of the specified path as text."""
return to_text(read_binary_file(path))
def read_binary_file(path): # type: (t.AnyStr) -> bytes
def read_binary_file(path): # type: (str) -> bytes
"""Return the contents of the specified path as bytes."""
with open_binary_file(path) as file_obj:
return file_obj.read()
@ -43,7 +43,7 @@ def write_json_file(path, # type: str
content, # type: t.Any
create_directories=False, # type: bool
formatted=True, # type: bool
encoder=None, # type: t.Optional[t.Callable[[t.Any], t.Any]]
encoder=None, # type: t.Optional[t.Type[json.JSONEncoder]]
): # type: (...) -> str
"""Write the given json content to the specified path, optionally creating missing directories."""
text_content = json.dumps(content,
@ -67,21 +67,19 @@ def write_text_file(path, content, create_directories=False): # type: (str, str
file_obj.write(to_bytes(content))
def open_text_file(path, mode='r'): # type: (str, str) -> t.TextIO
def open_text_file(path, mode='r'): # type: (str, str) -> t.IO[str]
"""Open the given path for text access."""
if 'b' in mode:
raise Exception('mode cannot include "b" for text files: %s' % mode)
# noinspection PyTypeChecker
return io.open(to_bytes(path), mode, encoding=ENCODING) # pylint: disable=consider-using-with
def open_binary_file(path, mode='rb'): # type: (str, str) -> t.BinaryIO
def open_binary_file(path, mode='rb'): # type: (str, str) -> t.IO[bytes]
"""Open the given path for binary access."""
if 'b' not in mode:
raise Exception('mode must include "b" for binary files: %s' % mode)
# noinspection PyTypeChecker
return io.open(to_bytes(path), mode) # pylint: disable=consider-using-with

@ -21,8 +21,8 @@ class Metadata:
"""Metadata object for passing data to delegated tests."""
def __init__(self):
"""Initialize metadata."""
self.changes = {} # type: t.Dict[str, t.Tuple[t.Tuple[int, int]]]
self.cloud_config = None # type: t.Optional[t.Dict[str, str]]
self.changes = {} # type: t.Dict[str, t.Tuple[t.Tuple[int, int], ...]]
self.cloud_config = None # type: t.Optional[t.Dict[str, t.Dict[str, t.Union[int, str, bool]]]]
self.change_description = None # type: t.Optional[ChangeDescription]
self.ci_provider = None # type: t.Optional[str]

@ -34,8 +34,8 @@ from .util_common import (
)
# improve performance by disabling uid/gid lookups
tarfile.pwd = None
tarfile.grp = None
tarfile.pwd = None # type: ignore[attr-defined] # undocumented attribute
tarfile.grp = None # type: ignore[attr-defined] # undocumented attribute
def create_payload(args, dst_path): # type: (CommonConfig, str) -> None
@ -69,8 +69,8 @@ def create_payload(args, dst_path): # type: (CommonConfig, str) -> None
collection_layouts = data_context().create_collection_layouts()
content_files = []
extra_files = []
content_files = [] # type: t.List[t.Tuple[str, str]]
extra_files = [] # type: t.List[t.Tuple[str, str]]
for layout in collection_layouts:
if layout == data_context().content:

@ -206,7 +206,7 @@ class LayoutProvider(PathProvider):
def paths_to_tree(paths): # type: (t.List[str]) -> t.Tuple[t.Dict[str, t.Any], t.List[str]]
"""Return a filesystem tree from the given list of paths."""
tree = {}, []
tree = {}, [] # type: t.Tuple[t.Dict[str, t.Any], t.List[str]]
for path in paths:
parts = path.split(os.path.sep)

@ -22,6 +22,7 @@ from .util import (
open_binary_file,
verify_sys_executable,
version_to_str,
type_guard,
)
from .thread import (
@ -88,10 +89,9 @@ class HostState:
if not self.target_profiles:
raise Exception('No target profiles found.')
if not all(isinstance(target, profile_type) for target in self.target_profiles):
raise Exception(f'Target profile(s) are not of the required type: {profile_type}')
assert type_guard(self.target_profiles, profile_type)
return self.target_profiles
return t.cast(t.List[THostProfile], self.target_profiles)
def prepare_profiles(

@ -3,6 +3,7 @@ from __future__ import annotations
import atexit
import os
import typing as t
import urllib.parse
from .io import (
@ -54,7 +55,7 @@ def run_pypi_proxy(args, targets_use_pypi): # type: (EnvironmentConfig, bool) -
if args.pypi_endpoint:
return # user has overridden the proxy endpoint, there is nothing to provision
versions_needing_proxy = tuple() # preserved for future use, no versions currently require this
versions_needing_proxy = tuple() # type: t.Tuple[str, ...] # preserved for future use, no versions currently require this
posix_targets = [target for target in args.targets if isinstance(target, PosixConfig)]
need_proxy = targets_use_pypi and any(target.python.version in versions_needing_proxy for target in posix_targets)
use_proxy = args.pypi_proxy or need_proxy

@ -142,9 +142,9 @@ def install_requirements(
if ansible:
try:
ansible_cache = install_requirements.ansible_cache
ansible_cache = install_requirements.ansible_cache # type: ignore[attr-defined]
except AttributeError:
ansible_cache = install_requirements.ansible_cache = {}
ansible_cache = install_requirements.ansible_cache = {} # type: ignore[attr-defined]
ansible_installed = ansible_cache.get(python.path)
@ -486,7 +486,7 @@ def prepare_pip_script(commands): # type: (t.List[PipCommand]) -> str
def usable_pip_file(path): # type: (t.Optional[str]) -> bool
"""Return True if the specified pip file is usable, otherwise False."""
return path and os.path.exists(path) and os.path.getsize(path)
return bool(path) and os.path.exists(path) and bool(os.path.getsize(path))
# Cryptography

@ -47,7 +47,7 @@ class SshProcess:
"""Wrapper around an SSH process."""
def __init__(self, process): # type: (t.Optional[subprocess.Popen]) -> None
self._process = process
self.pending_forwards = None # type: t.Optional[t.Set[t.Tuple[str, int]]]
self.pending_forwards = None # type: t.Optional[t.List[t.Tuple[str, int]]]
self.forwards = {} # type: t.Dict[t.Tuple[str, int], int]
@ -71,7 +71,7 @@ class SshProcess:
def collect_port_forwards(self): # type: (SshProcess) -> t.Dict[t.Tuple[str, int], int]
"""Collect port assignments for dynamic SSH port forwards."""
errors = []
errors = [] # type: t.List[str]
display.info('Collecting %d SSH port forward(s).' % len(self.pending_forwards), verbosity=2)
@ -107,7 +107,7 @@ class SshProcess:
dst = (dst_host, dst_port)
else:
# explain mode
dst = list(self.pending_forwards)[0]
dst = self.pending_forwards[0]
src_port = random.randint(40000, 50000)
self.pending_forwards.remove(dst)
@ -202,7 +202,7 @@ def create_ssh_port_forwards(
"""
options = dict(
LogLevel='INFO', # info level required to get messages on stderr indicating the ports assigned to each forward
)
) # type: t.Dict[str, t.Union[str, int]]
cli_args = []
@ -221,7 +221,7 @@ def create_ssh_port_redirects(
redirects, # type: t.List[t.Tuple[int, str, int]]
): # type: (...) -> SshProcess
"""Create SSH port redirections using the provided list of tuples (bind_port, target_host, target_port)."""
options = {}
options = {} # type: t.Dict[str, t.Union[str, int]]
cli_args = []
for bind_port, target_host, target_port in redirects:

@ -219,7 +219,7 @@ class TestFailure(TestResult):
command, # type: str
test, # type: str
python_version=None, # type: t.Optional[str]
messages=None, # type: t.Optional[t.List[TestMessage]]
messages=None, # type: t.Optional[t.Sequence[TestMessage]]
summary=None, # type: t.Optional[str]
):
super().__init__(command, test, python_version)

@ -8,14 +8,14 @@ import queue
import typing as t
TCallable = t.TypeVar('TCallable', bound=t.Callable)
TCallable = t.TypeVar('TCallable', bound=t.Callable[..., t.Any])
class WrappedThread(threading.Thread):
"""Wrapper around Thread which captures results and exceptions."""
def __init__(self, action): # type: (t.Callable[[], t.Any]) -> None
super().__init__()
self._result = queue.Queue()
self._result = queue.Queue() # type: queue.Queue[t.Any]
self.action = action
self.result = None
@ -25,8 +25,8 @@ class WrappedThread(threading.Thread):
Do not override. Do not call directly. Executed by the start() method.
"""
# We truly want to catch anything that the worker thread might do including call sys.exit.
# Therefore we catch *everything* (including old-style class exceptions)
# noinspection PyBroadException, PyPep8
# Therefore, we catch *everything* (including old-style class exceptions)
# noinspection PyBroadException
try:
self._result.put((self.action(), None))
# pylint: disable=locally-disabled, bare-except
@ -58,4 +58,4 @@ def mutex(func): # type: (TCallable) -> TCallable
with lock:
return func(*args, **kwargs)
return wrapper
return wrapper # type: ignore[return-value] # requires https://www.python.org/dev/peps/pep-0612/ support

@ -2,7 +2,6 @@
from __future__ import annotations
import errno
# noinspection PyCompatibility
import fcntl
import importlib.util
import inspect
@ -23,9 +22,13 @@ import shlex
import typing as t
from struct import unpack, pack
# noinspection PyCompatibility
from termios import TIOCGWINSZ
try:
from typing_extensions import TypeGuard # TypeGuard was added in Python 3.9
except ImportError:
TypeGuard = None
from .encoding import (
to_bytes,
to_optional_bytes,
@ -257,8 +260,8 @@ def raw_command(
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
explain=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
stdin=None, # type: t.Optional[t.Union[t.IO[bytes], int]]
stdout=None, # type: t.Optional[t.Union[t.IO[bytes], int]]
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
@ -467,7 +470,6 @@ def is_binary_file(path): # type: (str) -> bool
return True
with open_binary_file(path) as path_fd:
# noinspection PyTypeChecker
return b'\0' in path_fd.read(4096)
@ -571,7 +573,7 @@ class Display:
self,
message, # type: str
color=None, # type: t.Optional[str]
fd=sys.stdout, # type: t.TextIO
fd=sys.stdout, # type: t.IO[str]
truncate=False, # type: bool
): # type: (...) -> None
"""Display a message."""
@ -772,7 +774,6 @@ def load_module(path, name): # type: (str, str) -> None
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
# noinspection PyUnresolvedReferences
spec.loader.exec_module(module)
@ -826,4 +827,19 @@ def verify_sys_executable(path): # type: (str) -> t.Optional[str]
return expected_executable
def type_guard(sequence: t.Sequence[t.Any], guard_type: t.Type[C]) -> TypeGuard[t.Sequence[C]]:
"""
Raises an exception if any item in the given sequence does not match the specified guard type.
Use with assert so that type checkers are aware of the type guard.
"""
invalid_types = set(type(item) for item in sequence if not isinstance(item, guard_type))
if not invalid_types:
return True
invalid_type_names = sorted(str(item) for item in invalid_types)
raise Exception(f'Sequence required to contain only {guard_type} includes: {", ".join(invalid_type_names)}')
display = Display() # pylint: disable=locally-disabled, invalid-name

@ -57,7 +57,7 @@ from .host_configs import (
VirtualPythonConfig,
)
CHECK_YAML_VERSIONS = {}
CHECK_YAML_VERSIONS = {} # type: t.Dict[str, t.Any]
class ShellScriptTemplate:
@ -65,7 +65,7 @@ class ShellScriptTemplate:
def __init__(self, template): # type: (t.Text) -> None
self.template = template
def substitute(self, **kwargs): # type: (t.Dict[str, t.Union[str, t.List[str]]]) -> str
def substitute(self, **kwargs: t.Union[str, t.List[str]]) -> str:
"""Return a string templated with the given arguments."""
kvp = dict((k, self.quote(v)) for k, v in kwargs.items())
pattern = re.compile(r'#{(?P<name>[^}]+)}')
@ -139,7 +139,7 @@ class CommonConfig:
self.session_name = generate_name()
self.cache = {}
self.cache = {} # type: t.Dict[str, t.Any]
def get_ansible_config(self): # type: () -> str
"""Return the path to the Ansible config for the given config."""
@ -194,15 +194,8 @@ def process_scoped_temporary_directory(args, prefix='ansible-test-', suffix=None
@contextlib.contextmanager
def named_temporary_file(args, prefix, suffix, directory, content):
"""
:param args: CommonConfig
:param prefix: str
:param suffix: str
:param directory: str
:param content: str | bytes | unicode
:rtype: str
"""
def named_temporary_file(args, prefix, suffix, directory, content): # type: (CommonConfig, str, str, t.Optional[str], str) -> t.Iterator[str]
"""Context manager for a named temporary file."""
if args.explain:
yield os.path.join(directory or '/tmp', '%stemp%s' % (prefix, suffix))
else:
@ -217,7 +210,7 @@ def write_json_test_results(category, # type: ResultType
name, # type: str
content, # type: t.Union[t.List[t.Any], t.Dict[str, t.Any]]
formatted=True, # type: bool
encoder=None, # type: t.Optional[t.Callable[[t.Any], t.Any]]
encoder=None, # type: t.Optional[t.Type[json.JSONEncoder]]
): # type: (...) -> None
"""Write the given json content to the specified test results path, creating directories as needed."""
path = os.path.join(category.path, name)
@ -411,8 +404,8 @@ def run_command(
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
always=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]

@ -201,7 +201,7 @@ def run_venv(args, # type: EnvironmentConfig
remove_tree(path)
if args.verbosity > 1:
display.error(ex)
display.error(ex.message)
return False
@ -237,7 +237,7 @@ def run_virtualenv(args, # type: EnvironmentConfig
remove_tree(path)
if args.verbosity > 1:
display.error(ex)
display.error(ex.message)
return False
@ -245,11 +245,11 @@ def run_virtualenv(args, # type: EnvironmentConfig
def get_virtualenv_version(args, python): # type: (EnvironmentConfig, str) -> t.Optional[t.Tuple[int, ...]]
"""Get the virtualenv version for the given python intepreter, if available, otherwise return None."""
"""Get the virtualenv version for the given python interpreter, if available, otherwise return None."""
try:
cache = get_virtualenv_version.cache
cache = get_virtualenv_version.cache # type: ignore[attr-defined]
except AttributeError:
cache = get_virtualenv_version.cache = {}
cache = get_virtualenv_version.cache = {} # type: ignore[attr-defined]
if python not in cache:
try:
@ -258,7 +258,7 @@ def get_virtualenv_version(args, python): # type: (EnvironmentConfig, str) -> t
stdout = ''
if args.verbosity > 1:
display.error(ex)
display.error(ex.message)
version = None

@ -277,12 +277,11 @@ def make_dirs(path): # type: (str) -> None
raise
def open_binary_file(path, mode='rb'): # type: (str, str) -> t.BinaryIO
def open_binary_file(path, mode='rb'): # type: (str, str) -> t.IO[bytes]
"""Open the given path for binary access."""
if 'b' not in mode:
raise Exception('mode must include "b" for binary files: %s' % mode)
# noinspection PyTypeChecker
return io.open(to_bytes(path), mode) # pylint: disable=consider-using-with

Loading…
Cancel
Save