ansible-test - Clean up indentation and spaces (#79980)

pull/79985/head
Matt Clay 2 years ago committed by GitHub
parent 71f2e777ed
commit 58d84933fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -284,11 +284,11 @@ def get_collection_detail(python: PythonConfig) -> CollectionDetail:
def run_playbook( def run_playbook(
args: EnvironmentConfig, args: EnvironmentConfig,
inventory_path: str, inventory_path: str,
playbook: str, playbook: str,
capture: bool, capture: bool,
variables: t.Optional[dict[str, t.Any]] = None, variables: t.Optional[dict[str, t.Any]] = None,
) -> None: ) -> None:
"""Run the specified playbook using the given inventory file and playbook variables.""" """Run the specified playbook using the given inventory file and playbook variables."""
playbook_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'playbooks', playbook) playbook_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'playbooks', playbook)

@ -379,9 +379,9 @@ class PathMapper:
if is_subdir(path, data_context().content.integration_path): if is_subdir(path, data_context().content.integration_path):
if dirname == data_context().content.integration_path: if dirname == data_context().content.integration_path:
for command in ( for command in (
'integration', 'integration',
'windows-integration', 'windows-integration',
'network-integration', 'network-integration',
): ):
if name == command and ext == '.cfg': if name == command and ext == '.cfg':
return { return {
@ -641,19 +641,19 @@ class PathMapper:
if '/' not in path: if '/' not in path:
if path in ( if path in (
'.gitignore', '.gitignore',
'COPYING', 'COPYING',
'LICENSE', 'LICENSE',
'Makefile', 'Makefile',
): ):
return minimal return minimal
if ext in ( if ext in (
'.in', '.in',
'.md', '.md',
'.rst', '.rst',
'.toml', '.toml',
'.txt', '.txt',
): ):
return minimal return minimal
@ -757,17 +757,17 @@ class PathMapper:
if path.startswith('test/lib/ansible_test/_data/requirements/'): if path.startswith('test/lib/ansible_test/_data/requirements/'):
if name in ( if name in (
'integration', 'integration',
'network-integration', 'network-integration',
'windows-integration', 'windows-integration',
): ):
return { return {
name: self.integration_all_target, name: self.integration_all_target,
} }
if name in ( if name in (
'sanity', 'sanity',
'units', 'units',
): ):
return { return {
name: 'all', name: 'all',
@ -826,11 +826,11 @@ class PathMapper:
if '/' not in path: if '/' not in path:
if path in ( if path in (
'.gitattributes', '.gitattributes',
'.gitignore', '.gitignore',
'.mailmap', '.mailmap',
'COPYING', 'COPYING',
'Makefile', 'Makefile',
): ):
return minimal return minimal
@ -840,11 +840,11 @@ class PathMapper:
return all_tests(self.args) # broad impact, run all tests return all_tests(self.args) # broad impact, run all tests
if ext in ( if ext in (
'.in', '.in',
'.md', '.md',
'.rst', '.rst',
'.toml', '.toml',
'.txt', '.txt',
): ):
return minimal return minimal

@ -40,11 +40,11 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
self.registered_completions: t.Optional[list[str]] = None self.registered_completions: t.Optional[list[str]] = None
def completer( def completer(
self, self,
prefix: str, prefix: str,
action: argparse.Action, action: argparse.Action,
parsed_args: argparse.Namespace, parsed_args: argparse.Namespace,
**kwargs, **kwargs,
) -> list[str]: ) -> list[str]:
""" """
Return a list of completions for the specified prefix and action. Return a list of completions for the specified prefix and action.
@ -63,10 +63,10 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
@abc.abstractmethod @abc.abstractmethod
def get_completions( def get_completions(
self, self,
prefix: str, prefix: str,
action: argparse.Action, action: argparse.Action,
parsed_args: argparse.Namespace, parsed_args: argparse.Namespace,
) -> list[str]: ) -> list[str]:
""" """
Return a list of completions for the specified prefix and action. Return a list of completions for the specified prefix and action.
@ -89,9 +89,9 @@ class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {} documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {}
def __init__( def __init__(
self, self,
*args, *args,
**kwargs, **kwargs,
): ):
self.definition = self.create_parser() self.definition = self.create_parser()
self.documentation_state[type(self)] = documentation_state = DocumentationState() self.documentation_state[type(self)] = documentation_state = DocumentationState()
@ -108,11 +108,11 @@ class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
"""Return a namespace parser to parse the argument associated with this action.""" """Return a namespace parser to parse the argument associated with this action."""
def __call__( def __call__(
self, self,
parser, parser,
namespace, namespace,
values, values,
option_string=None, option_string=None,
): ):
state = ParserState(mode=ParserMode.PARSE, namespaces=[namespace], remainder=values) state = ParserState(mode=ParserMode.PARSE, namespaces=[namespace], remainder=values)
@ -135,10 +135,10 @@ class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
class CompositeActionCompletionFinder(RegisteredCompletionFinder): class CompositeActionCompletionFinder(RegisteredCompletionFinder):
"""Completion finder with support for composite argument parsing.""" """Completion finder with support for composite argument parsing."""
def get_completions( def get_completions(
self, self,
prefix: str, prefix: str,
action: argparse.Action, action: argparse.Action,
parsed_args: argparse.Namespace, parsed_args: argparse.Namespace,
) -> list[str]: ) -> list[str]:
"""Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed.""" """Return a list of completions appropriate for the given prefix and action, taking into account the arguments that have already been parsed."""
assert isinstance(action, CompositeAction) assert isinstance(action, CompositeAction)
@ -232,8 +232,8 @@ def detect_false_file_completion(value: str, mode: ParserMode) -> bool:
def complete( def complete(
completer: Parser, completer: Parser,
state: ParserState, state: ParserState,
) -> Completion: ) -> Completion:
"""Perform argument completion using the given completer and return the completion result.""" """Perform argument completion using the given completer and return the completion result."""
value = state.remainder value = state.remainder

@ -44,8 +44,8 @@ from .units import (
def do_commands( def do_commands(
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for all commands.""" """Command line parsing for all commands."""
common = argparse.ArgumentParser(add_help=False) common = argparse.ArgumentParser(add_help=False)

@ -37,9 +37,9 @@ from .xml import (
def do_coverage( def do_coverage(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for all `coverage` commands.""" """Command line parsing for all `coverage` commands."""
coverage_common = argparse.ArgumentParser(add_help=False, parents=[parent]) coverage_common = argparse.ArgumentParser(add_help=False, parents=[parent])
@ -61,7 +61,7 @@ def do_coverage(
def add_coverage_common( def add_coverage_common(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
): ):
"""Add common coverage arguments.""" """Add common coverage arguments."""
parser.add_argument( parser.add_argument(

@ -13,9 +13,9 @@ from ....environments import (
def do_analyze( def do_analyze(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for all `coverage analyze` commands.""" """Command line parsing for all `coverage analyze` commands."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -29,9 +29,9 @@ from .missing import (
def do_targets( def do_targets(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for all `coverage analyze targets` commands.""" """Command line parsing for all `coverage analyze targets` commands."""
targets = subparsers.add_parser( targets = subparsers.add_parser(

@ -17,9 +17,9 @@ from .....environments import (
def do_combine( def do_combine(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `coverage analyze targets combine` command.""" """Command line parsing for the `coverage analyze targets combine` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -17,9 +17,9 @@ from .....environments import (
def do_expand( def do_expand(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `coverage analyze targets expand` command.""" """Command line parsing for the `coverage analyze targets expand` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -17,9 +17,9 @@ from .....environments import (
def do_filter( def do_filter(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `coverage analyze targets filter` command.""" """Command line parsing for the `coverage analyze targets filter` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -17,9 +17,9 @@ from .....environments import (
def do_generate( def do_generate(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `coverage analyze targets generate` command.""" """Command line parsing for the `coverage analyze targets generate` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -17,9 +17,9 @@ from .....environments import (
def do_missing( def do_missing(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `coverage analyze targets missing` command.""" """Command line parsing for the `coverage analyze targets missing` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -19,10 +19,10 @@ from ...environments import (
def do_combine( def do_combine(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
add_coverage_common: c.Callable[[argparse.ArgumentParser], None], add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for the `coverage combine` command.""" """Command line parsing for the `coverage combine` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -17,9 +17,9 @@ from ...environments import (
def do_erase( def do_erase(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for the `coverage erase` command.""" """Command line parsing for the `coverage erase` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -19,10 +19,10 @@ from ...environments import (
def do_html( def do_html(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
add_coverage_common: c.Callable[[argparse.ArgumentParser], None], add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for the `coverage html` command.""" """Command line parsing for the `coverage html` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -19,10 +19,10 @@ from ...environments import (
def do_report( def do_report(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
add_coverage_common: c.Callable[[argparse.ArgumentParser], None], add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for the `coverage report` command.""" """Command line parsing for the `coverage report` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -19,10 +19,10 @@ from ...environments import (
def do_xml( def do_xml(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
add_coverage_common: c.Callable[[argparse.ArgumentParser], None], add_coverage_common: c.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
) -> None: ) -> None:
"""Command line parsing for the `coverage xml` command.""" """Command line parsing for the `coverage xml` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -17,9 +17,9 @@ from ..environments import (
def do_env( def do_env(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `env` command.""" """Command line parsing for the `env` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -26,9 +26,9 @@ from .windows import (
def do_integration( def do_integration(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for all integration commands.""" """Command line parsing for all integration commands."""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@ -42,7 +42,7 @@ def do_integration(
def add_integration_common( def add_integration_common(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
): ):
"""Add common integration arguments.""" """Add common integration arguments."""
register_completer(parser.add_argument( register_completer(parser.add_argument(

@ -35,10 +35,10 @@ from ...completers import (
def do_network_integration( def do_network_integration(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
add_integration_common: c.Callable[[argparse.ArgumentParser], None], add_integration_common: c.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `network-integration` command.""" """Command line parsing for the `network-integration` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -26,10 +26,10 @@ from ...environments import (
def do_posix_integration( def do_posix_integration(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
add_integration_common: c.Callable[[argparse.ArgumentParser], None], add_integration_common: c.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `integration` command.""" """Command line parsing for the `integration` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -26,10 +26,10 @@ from ...environments import (
def do_windows_integration( def do_windows_integration(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
add_integration_common: c.Callable[[argparse.ArgumentParser], None], add_integration_common: c.Callable[[argparse.ArgumentParser], None],
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `windows-integration` command.""" """Command line parsing for the `windows-integration` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -29,9 +29,9 @@ from ..environments import (
def do_sanity( def do_sanity(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `sanity` command.""" """Command line parsing for the `sanity` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -20,9 +20,9 @@ from ..environments import (
def do_shell( def do_shell(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `shell` command.""" """Command line parsing for the `shell` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -24,9 +24,9 @@ from ..environments import (
def do_units( def do_units(
subparsers, subparsers,
parent: argparse.ArgumentParser, parent: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
): ):
"""Command line parsing for the `units` command.""" """Command line parsing for the `units` command."""
parser: argparse.ArgumentParser = subparsers.add_parser( parser: argparse.ArgumentParser = subparsers.add_parser(

@ -191,9 +191,9 @@ class TargetMode(enum.Enum):
def convert_legacy_args( def convert_legacy_args(
argv: list[str], argv: list[str],
args: t.Union[argparse.Namespace, types.SimpleNamespace], args: t.Union[argparse.Namespace, types.SimpleNamespace],
mode: TargetMode, mode: TargetMode,
) -> HostSettings: ) -> HostSettings:
"""Convert pre-split host arguments in the given namespace to their split counterparts.""" """Convert pre-split host arguments in the given namespace to their split counterparts."""
old_options = LegacyHostOptions.create(args) old_options = LegacyHostOptions.create(args)
@ -262,9 +262,9 @@ def convert_legacy_args(
def controller_targets( def controller_targets(
mode: TargetMode, mode: TargetMode,
options: LegacyHostOptions, options: LegacyHostOptions,
controller: ControllerHostConfig, controller: ControllerHostConfig,
) -> list[HostConfig]: ) -> list[HostConfig]:
"""Return the configuration for controller targets.""" """Return the configuration for controller targets."""
python = native_python(options) python = native_python(options)
@ -288,8 +288,8 @@ def native_python(options: LegacyHostOptions) -> t.Optional[NativePythonConfig]:
def get_legacy_host_config( def get_legacy_host_config(
mode: TargetMode, mode: TargetMode,
options: LegacyHostOptions, options: LegacyHostOptions,
) -> tuple[ControllerHostConfig, list[HostConfig], t.Optional[FallbackDetail]]: ) -> tuple[ControllerHostConfig, list[HostConfig], t.Optional[FallbackDetail]]:
""" """
Returns controller and target host configs derived from the provided legacy host options. Returns controller and target host configs derived from the provided legacy host options.

@ -81,10 +81,10 @@ class ControllerMode(enum.Enum):
def add_environments( def add_environments(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
controller_mode: ControllerMode, controller_mode: ControllerMode,
target_mode: TargetMode, target_mode: TargetMode,
) -> None: ) -> None:
"""Add arguments for the environments used to run ansible-test and commands it invokes.""" """Add arguments for the environments used to run ansible-test and commands it invokes."""
no_environment = controller_mode == ControllerMode.NO_DELEGATION and target_mode == TargetMode.NO_TARGETS no_environment = controller_mode == ControllerMode.NO_DELEGATION and target_mode == TargetMode.NO_TARGETS
@ -114,8 +114,8 @@ def add_environments(
def add_global_options( def add_global_options(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
controller_mode: ControllerMode, controller_mode: ControllerMode,
): ):
"""Add global options for controlling the test environment that work with both the legacy and composite options.""" """Add global options for controlling the test environment that work with both the legacy and composite options."""
global_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='global environment arguments')) global_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='global environment arguments'))
@ -156,10 +156,10 @@ def add_global_options(
def add_composite_environment_options( def add_composite_environment_options(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
completer: CompositeActionCompletionFinder, completer: CompositeActionCompletionFinder,
controller_mode: ControllerMode, controller_mode: ControllerMode,
target_mode: TargetMode, target_mode: TargetMode,
) -> list[t.Type[CompositeAction]]: ) -> list[t.Type[CompositeAction]]:
"""Add composite options for controlling the test environment.""" """Add composite options for controlling the test environment."""
composite_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group( composite_parser = t.cast(argparse.ArgumentParser, parser.add_argument_group(
@ -246,9 +246,9 @@ def add_composite_environment_options(
def add_legacy_environment_options( def add_legacy_environment_options(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
controller_mode: ControllerMode, controller_mode: ControllerMode,
target_mode: TargetMode, target_mode: TargetMode,
): ):
"""Add legacy options for controlling the test environment.""" """Add legacy options for controlling the test environment."""
environment: argparse.ArgumentParser = parser.add_argument_group( # type: ignore[assignment] # real type private environment: argparse.ArgumentParser = parser.add_argument_group( # type: ignore[assignment] # real type private
@ -259,8 +259,8 @@ def add_legacy_environment_options(
def add_environments_python( def add_environments_python(
environments_parser: argparse.ArgumentParser, environments_parser: argparse.ArgumentParser,
target_mode: TargetMode, target_mode: TargetMode,
) -> None: ) -> None:
"""Add environment arguments to control the Python version(s) used.""" """Add environment arguments to control the Python version(s) used."""
python_versions: tuple[str, ...] python_versions: tuple[str, ...]
@ -285,9 +285,9 @@ def add_environments_python(
def add_environments_host( def add_environments_host(
environments_parser: argparse.ArgumentParser, environments_parser: argparse.ArgumentParser,
controller_mode: ControllerMode, controller_mode: ControllerMode,
target_mode: TargetMode, target_mode: TargetMode,
) -> None: ) -> None:
"""Add environment arguments for the given host and argument modes.""" """Add environment arguments for the given host and argument modes."""
environments_exclusive_group: argparse.ArgumentParser = environments_parser.add_mutually_exclusive_group() # type: ignore[assignment] # real type private environments_exclusive_group: argparse.ArgumentParser = environments_parser.add_mutually_exclusive_group() # type: ignore[assignment] # real type private
@ -341,7 +341,7 @@ def add_environment_network(
def add_environment_windows( def add_environment_windows(
environments_parser: argparse.ArgumentParser, environments_parser: argparse.ArgumentParser,
) -> None: ) -> None:
"""Add environment arguments for running on a windows host.""" """Add environment arguments for running on a windows host."""
register_completer(environments_parser.add_argument( register_completer(environments_parser.add_argument(
@ -359,7 +359,7 @@ def add_environment_windows(
def add_environment_local( def add_environment_local(
exclusive_parser: argparse.ArgumentParser, exclusive_parser: argparse.ArgumentParser,
) -> None: ) -> None:
"""Add environment arguments for running on the local (origin) host.""" """Add environment arguments for running on the local (origin) host."""
exclusive_parser.add_argument( exclusive_parser.add_argument(
@ -370,8 +370,8 @@ def add_environment_local(
def add_environment_venv( def add_environment_venv(
exclusive_parser: argparse.ArgumentParser, exclusive_parser: argparse.ArgumentParser,
environments_parser: argparse.ArgumentParser, environments_parser: argparse.ArgumentParser,
) -> None: ) -> None:
"""Add environment arguments for running in ansible-test managed virtual environments.""" """Add environment arguments for running in ansible-test managed virtual environments."""
exclusive_parser.add_argument( exclusive_parser.add_argument(
@ -387,8 +387,8 @@ def add_environment_venv(
def add_global_docker( def add_global_docker(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
controller_mode: ControllerMode, controller_mode: ControllerMode,
) -> None: ) -> None:
"""Add global options for Docker.""" """Add global options for Docker."""
if controller_mode != ControllerMode.DELEGATED: if controller_mode != ControllerMode.DELEGATED:
@ -450,9 +450,9 @@ def add_global_docker(
def add_environment_docker( def add_environment_docker(
exclusive_parser: argparse.ArgumentParser, exclusive_parser: argparse.ArgumentParser,
environments_parser: argparse.ArgumentParser, environments_parser: argparse.ArgumentParser,
target_mode: TargetMode, target_mode: TargetMode,
) -> None: ) -> None:
"""Add environment arguments for running in docker containers.""" """Add environment arguments for running in docker containers."""
if target_mode in (TargetMode.POSIX_INTEGRATION, TargetMode.SHELL): if target_mode in (TargetMode.POSIX_INTEGRATION, TargetMode.SHELL):
@ -490,8 +490,8 @@ def add_environment_docker(
def add_global_remote( def add_global_remote(
parser: argparse.ArgumentParser, parser: argparse.ArgumentParser,
controller_mode: ControllerMode, controller_mode: ControllerMode,
) -> None: ) -> None:
"""Add global options for remote instances.""" """Add global options for remote instances."""
if controller_mode != ControllerMode.DELEGATED: if controller_mode != ControllerMode.DELEGATED:
@ -529,9 +529,9 @@ def add_global_remote(
def add_environment_remote( def add_environment_remote(
exclusive_parser: argparse.ArgumentParser, exclusive_parser: argparse.ArgumentParser,
environments_parser: argparse.ArgumentParser, environments_parser: argparse.ArgumentParser,
target_mode: TargetMode, target_mode: TargetMode,
) -> None: ) -> None:
"""Add environment arguments for running in ansible-core-ci provisioned remote virtual machines.""" """Add environment arguments for running in ansible-core-ci provisioned remote virtual machines."""
if target_mode == TargetMode.POSIX_INTEGRATION: if target_mode == TargetMode.POSIX_INTEGRATION:

@ -36,7 +36,7 @@ from ...python_requirements import (
install_requirements, install_requirements,
) )
from ... target import ( from ...target import (
walk_module_targets, walk_module_targets,
) )
@ -158,11 +158,11 @@ def get_python_modules() -> dict[str, str]:
def enumerate_python_arcs( def enumerate_python_arcs(
path: str, path: str,
coverage: coverage_module, coverage: coverage_module,
modules: dict[str, str], modules: dict[str, str],
collection_search_re: t.Optional[t.Pattern], collection_search_re: t.Optional[t.Pattern],
collection_sub_re: t.Optional[t.Pattern], collection_sub_re: t.Optional[t.Pattern],
) -> c.Generator[tuple[str, set[tuple[int, int]]], None, None]: ) -> c.Generator[tuple[str, set[tuple[int, int]]], None, None]:
"""Enumerate Python code coverage arcs in the given file.""" """Enumerate Python code coverage arcs in the given file."""
if os.path.getsize(path) == 0: if os.path.getsize(path) == 0:
@ -235,9 +235,9 @@ def read_python_coverage_legacy(path: str) -> PythonArcs:
def enumerate_powershell_lines( def enumerate_powershell_lines(
path: str, path: str,
collection_search_re: t.Optional[t.Pattern], collection_search_re: t.Optional[t.Pattern],
collection_sub_re: t.Optional[t.Pattern], collection_sub_re: t.Optional[t.Pattern],
) -> c.Generator[tuple[str, dict[int, int]], None, None]: ) -> c.Generator[tuple[str, dict[int, int]], None, None]:
"""Enumerate PowerShell code coverage lines in the given file.""" """Enumerate PowerShell code coverage lines in the given file."""
if os.path.getsize(path) == 0: if os.path.getsize(path) == 0:
@ -274,10 +274,10 @@ def enumerate_powershell_lines(
def sanitize_filename( def sanitize_filename(
filename: str, filename: str,
modules: t.Optional[dict[str, str]] = None, modules: t.Optional[dict[str, str]] = None,
collection_search_re: t.Optional[t.Pattern] = None, collection_search_re: t.Optional[t.Pattern] = None,
collection_sub_re: t.Optional[t.Pattern] = None, collection_sub_re: t.Optional[t.Pattern] = None,
) -> t.Optional[str]: ) -> t.Optional[str]:
"""Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid.""" """Convert the given code coverage path to a local absolute path and return its, or None if the path is not valid."""
ansible_path = os.path.abspath('lib/ansible/') + '/' ansible_path = os.path.abspath('lib/ansible/') + '/'

@ -119,9 +119,9 @@ def get_target_index(name: str, target_indexes: TargetIndexes) -> int:
def expand_indexes( def expand_indexes(
source_data: IndexedPoints, source_data: IndexedPoints,
source_index: list[str], source_index: list[str],
format_func: c.Callable[[TargetKey], TFlexKey], format_func: c.Callable[[TargetKey], TFlexKey],
) -> dict[str, dict[TFlexKey, set[str]]]: ) -> dict[str, dict[TFlexKey, set[str]]]:
"""Expand indexes from the source into target names for easier processing of the data (arcs or lines).""" """Expand indexes from the source into target names for easier processing of the data (arcs or lines)."""
combined_data: dict[str, dict[TFlexKey, set[str]]] = {} combined_data: dict[str, dict[TFlexKey, set[str]]] = {}

@ -58,10 +58,10 @@ def command_coverage_analyze_targets_combine(args: CoverageAnalyzeTargetsCombine
def merge_indexes( def merge_indexes(
source_data: IndexedPoints, source_data: IndexedPoints,
source_index: list[str], source_index: list[str],
combined_data: IndexedPoints, combined_data: IndexedPoints,
combined_index: TargetIndexes, combined_index: TargetIndexes,
) -> None: ) -> None:
"""Merge indexes from the source into the combined data set (arcs or lines).""" """Merge indexes from the source into the combined data set (arcs or lines)."""
for covered_path, covered_points in source_data.items(): for covered_path, covered_points in source_data.items():

@ -97,9 +97,9 @@ def command_coverage_analyze_targets_filter(args: CoverageAnalyzeTargetsFilterCo
def filter_data( def filter_data(
data: NamedPoints, data: NamedPoints,
path_filter_func: c.Callable[[str], bool], path_filter_func: c.Callable[[str], bool],
target_filter_func: c.Callable[[set[str]], set[str]], target_filter_func: c.Callable[[set[str]], set[str]],
) -> NamedPoints: ) -> NamedPoints:
"""Filter the data set using the specified filter function.""" """Filter the data set using the specified filter function."""
result: NamedPoints = {} result: NamedPoints = {}

@ -75,10 +75,10 @@ def command_coverage_analyze_targets_generate(args: CoverageAnalyzeTargetsGenera
def analyze_python_coverage( def analyze_python_coverage(
args: CoverageAnalyzeTargetsGenerateConfig, args: CoverageAnalyzeTargetsGenerateConfig,
host_state: HostState, host_state: HostState,
path: str, path: str,
target_indexes: TargetIndexes, target_indexes: TargetIndexes,
) -> Arcs: ) -> Arcs:
"""Analyze Python code coverage.""" """Analyze Python code coverage."""
results: Arcs = {} results: Arcs = {}
@ -107,9 +107,9 @@ def analyze_python_coverage(
def analyze_powershell_coverage( def analyze_powershell_coverage(
args: CoverageAnalyzeTargetsGenerateConfig, args: CoverageAnalyzeTargetsGenerateConfig,
path: str, path: str,
target_indexes: TargetIndexes, target_indexes: TargetIndexes,
) -> Lines: ) -> Lines:
"""Analyze PowerShell code coverage""" """Analyze PowerShell code coverage"""
results: Lines = {} results: Lines = {}
@ -136,9 +136,9 @@ def analyze_powershell_coverage(
def prune_invalid_filenames( def prune_invalid_filenames(
args: CoverageAnalyzeTargetsGenerateConfig, args: CoverageAnalyzeTargetsGenerateConfig,
results: dict[str, t.Any], results: dict[str, t.Any],
collection_search_re: t.Optional[t.Pattern] = None, collection_search_re: t.Optional[t.Pattern] = None,
) -> None: ) -> None:
"""Remove invalid filenames from the given result set.""" """Remove invalid filenames from the given result set."""
path_checker = PathChecker(args, collection_search_re) path_checker = PathChecker(args, collection_search_re)

@ -66,11 +66,11 @@ def command_coverage_analyze_targets_missing(args: CoverageAnalyzeTargetsMissing
def find_gaps( def find_gaps(
from_data: IndexedPoints, from_data: IndexedPoints,
from_index: list[str], from_index: list[str],
to_data: IndexedPoints, to_data: IndexedPoints,
target_indexes: TargetIndexes, target_indexes: TargetIndexes,
only_exists: bool, only_exists: bool,
) -> IndexedPoints: ) -> IndexedPoints:
"""Find gaps in coverage between the from and to data sets.""" """Find gaps in coverage between the from and to data sets."""
target_data: IndexedPoints = {} target_data: IndexedPoints = {}
@ -91,12 +91,12 @@ def find_gaps(
def find_missing( def find_missing(
from_data: IndexedPoints, from_data: IndexedPoints,
from_index: list[str], from_index: list[str],
to_data: IndexedPoints, to_data: IndexedPoints,
to_index: list[str], to_index: list[str],
target_indexes: TargetIndexes, target_indexes: TargetIndexes,
only_exists: bool, only_exists: bool,
) -> IndexedPoints: ) -> IndexedPoints:
"""Find coverage in from_data not present in to_data (arcs or lines).""" """Find coverage in from_data not present in to_data (arcs or lines)."""
target_data: IndexedPoints = {} target_data: IndexedPoints = {}

@ -285,9 +285,9 @@ def _get_coverage_targets(args: CoverageCombineConfig, walk_func: c.Callable) ->
def _build_stub_groups( def _build_stub_groups(
args: CoverageCombineConfig, args: CoverageCombineConfig,
sources: list[tuple[str, int]], sources: list[tuple[str, int]],
default_stub_value: c.Callable[[list[str]], dict[str, TValue]], default_stub_value: c.Callable[[list[str]], dict[str, TValue]],
) -> dict[str, dict[str, TValue]]: ) -> dict[str, dict[str, TValue]]:
""" """
Split the given list of sources with line counts into groups, maintaining a maximum line count for each group. Split the given list of sources with line counts into groups, maintaining a maximum line count for each group.

@ -243,9 +243,9 @@ def delegate_inventory(args: IntegrationConfig, inventory_path_src: str) -> None
@contextlib.contextmanager @contextlib.contextmanager
def integration_test_environment( def integration_test_environment(
args: IntegrationConfig, args: IntegrationConfig,
target: IntegrationTarget, target: IntegrationTarget,
inventory_path_src: str, inventory_path_src: str,
) -> c.Iterator[IntegrationEnvironment]: ) -> c.Iterator[IntegrationEnvironment]:
"""Context manager that prepares the integration test environment and cleans it up.""" """Context manager that prepares the integration test environment and cleans it up."""
ansible_config_src = args.get_ansible_config() ansible_config_src = args.get_ansible_config()
@ -346,9 +346,9 @@ def integration_test_environment(
@contextlib.contextmanager @contextlib.contextmanager
def integration_test_config_file( def integration_test_config_file(
args: IntegrationConfig, args: IntegrationConfig,
env_config: CloudEnvironmentConfig, env_config: CloudEnvironmentConfig,
integration_dir: str, integration_dir: str,
) -> c.Iterator[t.Optional[str]]: ) -> c.Iterator[t.Optional[str]]:
"""Context manager that provides a config file for integration tests, if needed.""" """Context manager that provides a config file for integration tests, if needed."""
if not env_config: if not env_config:
@ -375,10 +375,10 @@ def integration_test_config_file(
def create_inventory( def create_inventory(
args: IntegrationConfig, args: IntegrationConfig,
host_state: HostState, host_state: HostState,
inventory_path: str, inventory_path: str,
target: IntegrationTarget, target: IntegrationTarget,
) -> None: ) -> None:
"""Create inventory.""" """Create inventory."""
if isinstance(args, PosixIntegrationConfig): if isinstance(args, PosixIntegrationConfig):
@ -401,13 +401,13 @@ def create_inventory(
def command_integration_filtered( def command_integration_filtered(
args: IntegrationConfig, args: IntegrationConfig,
host_state: HostState, host_state: HostState,
targets: tuple[IntegrationTarget, ...], targets: tuple[IntegrationTarget, ...],
all_targets: tuple[IntegrationTarget, ...], all_targets: tuple[IntegrationTarget, ...],
inventory_path: str, inventory_path: str,
pre_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, pre_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None,
post_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None, post_target: t.Optional[c.Callable[[IntegrationTarget], None]] = None,
): ):
"""Run integration tests for the specified targets.""" """Run integration tests for the specified targets."""
found = False found = False
@ -580,12 +580,12 @@ def command_integration_filtered(
def command_integration_script( def command_integration_script(
args: IntegrationConfig, args: IntegrationConfig,
host_state: HostState, host_state: HostState,
target: IntegrationTarget, target: IntegrationTarget,
test_dir: str, test_dir: str,
inventory_path: str, inventory_path: str,
coverage_manager: CoverageManager, coverage_manager: CoverageManager,
): ):
"""Run an integration test script.""" """Run an integration test script."""
display.info('Running %s integration test script' % target.name) display.info('Running %s integration test script' % target.name)
@ -632,13 +632,13 @@ def command_integration_script(
def command_integration_role( def command_integration_role(
args: IntegrationConfig, args: IntegrationConfig,
host_state: HostState, host_state: HostState,
target: IntegrationTarget, target: IntegrationTarget,
start_at_task: t.Optional[str], start_at_task: t.Optional[str],
test_dir: str, test_dir: str,
inventory_path: str, inventory_path: str,
coverage_manager: CoverageManager, coverage_manager: CoverageManager,
): ):
"""Run an integration test role.""" """Run an integration test role."""
display.info('Running %s integration test role' % target.name) display.info('Running %s integration test role' % target.name)
@ -751,15 +751,15 @@ def command_integration_role(
def run_setup_targets( def run_setup_targets(
args: IntegrationConfig, args: IntegrationConfig,
host_state: HostState, host_state: HostState,
test_dir: str, test_dir: str,
target_names: c.Sequence[str], target_names: c.Sequence[str],
targets_dict: dict[str, IntegrationTarget], targets_dict: dict[str, IntegrationTarget],
targets_executed: set[str], targets_executed: set[str],
inventory_path: str, inventory_path: str,
coverage_manager: CoverageManager, coverage_manager: CoverageManager,
always: bool, always: bool,
): ):
"""Run setup targets.""" """Run setup targets."""
for target_name in target_names: for target_name in target_names:
@ -782,13 +782,13 @@ def run_setup_targets(
def integration_environment( def integration_environment(
args: IntegrationConfig, args: IntegrationConfig,
target: IntegrationTarget, target: IntegrationTarget,
test_dir: str, test_dir: str,
inventory_path: str, inventory_path: str,
ansible_config: t.Optional[str], ansible_config: t.Optional[str],
env_config: t.Optional[CloudEnvironmentConfig], env_config: t.Optional[CloudEnvironmentConfig],
test_env: IntegrationEnvironment, test_env: IntegrationEnvironment,
) -> dict[str, str]: ) -> dict[str, str]:
"""Return a dictionary of environment variables to use when running the given integration test target.""" """Return a dictionary of environment variables to use when running the given integration test target."""
env = ansible_environment(args, ansible_config=ansible_config) env = ansible_environment(args, ansible_config=ansible_config)

@ -136,7 +136,7 @@ class CsCloudProvider(CloudProvider):
# noinspection PyBroadException # noinspection PyBroadException
try: try:
json.loads(value) json.loads(value)
except Exception: # pylint: disable=broad-except except Exception: # pylint: disable=broad-except
return False # sometimes the file exists but is not yet valid JSON return False # sometimes the file exists but is not yet valid JSON
return True return True

@ -67,12 +67,12 @@ class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta):
return self.configs[0] return self.configs[0]
def skip( def skip(
self, self,
skip: str, skip: str,
reason: str, reason: str,
targets: list[IntegrationTarget], targets: list[IntegrationTarget],
exclude: set[str], exclude: set[str],
override: t.Optional[list[str]] = None, override: t.Optional[list[str]] = None,
) -> None: ) -> None:
"""Apply the specified skip rule to the given targets by updating the provided exclude list.""" """Apply the specified skip rule to the given targets by updating the provided exclude list."""
if skip.startswith('skip/'): if skip.startswith('skip/'):

@ -635,11 +635,11 @@ class SanitySkipped(TestSkipped):
class SanityFailure(TestFailure): class SanityFailure(TestFailure):
"""Sanity test failure.""" """Sanity test failure."""
def __init__( def __init__(
self, self,
test: str, test: str,
python_version: t.Optional[str] = None, python_version: t.Optional[str] = None,
messages: t.Optional[c.Sequence[SanityMessage]] = None, messages: t.Optional[c.Sequence[SanityMessage]] = None,
summary: t.Optional[str] = None, summary: t.Optional[str] = None,
) -> None: ) -> None:
super().__init__(COMMAND, test, python_version, messages, summary) super().__init__(COMMAND, test, python_version, messages, summary)
@ -1101,11 +1101,11 @@ def sanity_get_tests() -> tuple[SanityTest, ...]:
def create_sanity_virtualenv( def create_sanity_virtualenv(
args: SanityConfig, args: SanityConfig,
python: PythonConfig, python: PythonConfig,
name: str, name: str,
coverage: bool = False, coverage: bool = False,
minimize: bool = False, minimize: bool = False,
) -> t.Optional[VirtualPythonConfig]: ) -> t.Optional[VirtualPythonConfig]:
"""Return an existing sanity virtual environment matching the requested parameters or create a new one.""" """Return an existing sanity virtual environment matching the requested parameters or create a new one."""
commands = collect_requirements( # create_sanity_virtualenv() commands = collect_requirements( # create_sanity_virtualenv()

@ -122,8 +122,8 @@ class ImportTest(SanityMultipleVersion):
messages = [] messages = []
for import_type, test in ( for import_type, test in (
('module', _get_module_test(True)), ('module', _get_module_test(True)),
('plugin', _get_module_test(False)), ('plugin', _get_module_test(False)),
): ):
if import_type == 'plugin' and python.version in REMOTE_ONLY_PYTHON_VERSIONS: if import_type == 'plugin' and python.version in REMOTE_ONLY_PYTHON_VERSIONS:
continue continue

@ -322,10 +322,10 @@ class IntegrationAliasesTest(SanitySingleVersion):
return messages return messages
def check_ci_group( def check_ci_group(
self, self,
targets: tuple[CompletionTarget, ...], targets: tuple[CompletionTarget, ...],
find: str, find: str,
find_incidental: t.Optional[list[str]] = None, find_incidental: t.Optional[list[str]] = None,
) -> list[SanityMessage]: ) -> list[SanityMessage]:
"""Check the CI groups set in the provided targets and return a list of messages with any issues found.""" """Check the CI groups set in the provided targets and return a list of messages with any issues found."""
all_paths = set(target.path for target in targets) all_paths = set(target.path for target in targets)

@ -176,11 +176,11 @@ class MypyTest(SanityMultipleVersion):
@staticmethod @staticmethod
def test_context( def test_context(
args: SanityConfig, args: SanityConfig,
virtualenv_python: VirtualPythonConfig, virtualenv_python: VirtualPythonConfig,
python: PythonConfig, python: PythonConfig,
context: MyPyContext, context: MyPyContext,
paths: list[str], paths: list[str],
) -> list[SanityMessage]: ) -> list[SanityMessage]:
"""Run mypy tests for the specified context.""" """Run mypy tests for the specified context."""
context_paths = [path for path in paths if any(is_subdir(path, match_path) for match_path in context.paths)] context_paths = [path for path in paths if any(is_subdir(path, match_path) for match_path in context.paths)]

@ -189,13 +189,13 @@ class PylintTest(SanitySingleVersion):
@staticmethod @staticmethod
def pylint( def pylint(
args: SanityConfig, args: SanityConfig,
context: str, context: str,
paths: list[str], paths: list[str],
plugin_dir: str, plugin_dir: str,
plugin_names: list[str], plugin_names: list[str],
python: PythonConfig, python: PythonConfig,
collection_detail: CollectionDetail, collection_detail: CollectionDetail,
) -> list[dict[str, str]]: ) -> list[dict[str, str]]:
"""Run pylint using the config specified by the context on the specified paths.""" """Run pylint using the config specified by the context on the specified paths."""
rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', context.split('/')[0] + '.cfg') rcfile = os.path.join(SANITY_ROOT, 'pylint', 'config', context.split('/')[0] + '.cfg')

@ -270,9 +270,9 @@ def parse_completion_entry(value: str) -> tuple[str, dict[str, str]]:
def filter_completion( def filter_completion(
completion: dict[str, TCompletionConfig], completion: dict[str, TCompletionConfig],
controller_only: bool = False, controller_only: bool = False,
include_defaults: bool = False, include_defaults: bool = False,
) -> dict[str, TCompletionConfig]: ) -> dict[str, TCompletionConfig]:
"""Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified.""" """Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified."""
if controller_only: if controller_only:

@ -108,19 +108,19 @@ class CleanupMode(enum.Enum):
def run_support_container( def run_support_container(
args: EnvironmentConfig, args: EnvironmentConfig,
context: str, context: str,
image: str, image: str,
name: str, name: str,
ports: list[int], ports: list[int],
aliases: t.Optional[list[str]] = None, aliases: t.Optional[list[str]] = None,
start: bool = True, start: bool = True,
allow_existing: bool = False, allow_existing: bool = False,
cleanup: t.Optional[CleanupMode] = None, cleanup: t.Optional[CleanupMode] = None,
cmd: t.Optional[list[str]] = None, cmd: t.Optional[list[str]] = None,
env: t.Optional[dict[str, str]] = None, env: t.Optional[dict[str, str]] = None,
options: t.Optional[list[str]] = None, options: t.Optional[list[str]] = None,
publish_ports: bool = True, publish_ports: bool = True,
) -> t.Optional[ContainerDescriptor]: ) -> t.Optional[ContainerDescriptor]:
""" """
Start a container used to support tests, but not run them. Start a container used to support tests, but not run them.
@ -236,12 +236,12 @@ def run_support_container(
def run_container( def run_container(
args: EnvironmentConfig, args: EnvironmentConfig,
image: str, image: str,
name: str, name: str,
options: t.Optional[list[str]], options: t.Optional[list[str]],
cmd: t.Optional[list[str]] = None, cmd: t.Optional[list[str]] = None,
create_only: bool = False, create_only: bool = False,
) -> str: ) -> str:
"""Run a container using the given docker image.""" """Run a container using the given docker image."""
options = list(options or []) options = list(options or [])
@ -594,8 +594,8 @@ class SupportContainerContext:
@contextlib.contextmanager @contextlib.contextmanager
def support_container_context( def support_container_context(
args: EnvironmentConfig, args: EnvironmentConfig,
ssh: t.Optional[SshConnectionDetail], ssh: t.Optional[SshConnectionDetail],
) -> c.Iterator[t.Optional[ContainerDatabase]]: ) -> c.Iterator[t.Optional[ContainerDatabase]]:
"""Create a context manager for integration tests that use support containers.""" """Create a context manager for integration tests that use support containers."""
if not isinstance(args, (IntegrationConfig, UnitsConfig, SanityConfig, ShellConfig)): if not isinstance(args, (IntegrationConfig, UnitsConfig, SanityConfig, ShellConfig)):
@ -617,9 +617,9 @@ def support_container_context(
def create_support_container_context( def create_support_container_context(
args: EnvironmentConfig, args: EnvironmentConfig,
ssh: t.Optional[SshConnectionDetail], ssh: t.Optional[SshConnectionDetail],
containers: ContainerDatabase, containers: ContainerDatabase,
) -> SupportContainerContext: ) -> SupportContainerContext:
"""Context manager that provides SSH port forwards. Returns updated container metadata.""" """Context manager that provides SSH port forwards. Returns updated container metadata."""
host_type = HostType.control host_type = HostType.control
@ -819,9 +819,9 @@ def create_hosts_entries(context: dict[str, ContainerAccess]) -> list[str]:
def create_container_hooks( def create_container_hooks(
args: IntegrationConfig, args: IntegrationConfig,
control_connections: list[SshConnectionDetail], control_connections: list[SshConnectionDetail],
managed_connections: t.Optional[list[SshConnectionDetail]], managed_connections: t.Optional[list[SshConnectionDetail]],
) -> tuple[t.Optional[c.Callable[[IntegrationTarget], None]], t.Optional[c.Callable[[IntegrationTarget], None]]]: ) -> tuple[t.Optional[c.Callable[[IntegrationTarget], None]], t.Optional[c.Callable[[IntegrationTarget], None]]]:
"""Return pre and post target callbacks for enabling and disabling container access for each test target.""" """Return pre and post target callbacks for enabling and disabling container access for each test target."""
containers = get_container_database(args) containers = get_container_database(args)
@ -873,13 +873,13 @@ def create_managed_contexts(control_contexts: dict[str, dict[str, ContainerAcces
def forward_ssh_ports( def forward_ssh_ports(
args: IntegrationConfig, args: IntegrationConfig,
ssh_connections: t.Optional[list[SshConnectionDetail]], ssh_connections: t.Optional[list[SshConnectionDetail]],
playbook: str, playbook: str,
target_state: dict[str, tuple[list[str], list[SshProcess]]], target_state: dict[str, tuple[list[str], list[SshProcess]]],
target: IntegrationTarget, target: IntegrationTarget,
host_type: str, host_type: str,
contexts: dict[str, dict[str, ContainerAccess]], contexts: dict[str, dict[str, ContainerAccess]],
) -> None: ) -> None:
"""Configure port forwarding using SSH and write hosts file entries.""" """Configure port forwarding using SSH and write hosts file entries."""
if ssh_connections is None: if ssh_connections is None:
@ -944,12 +944,12 @@ def forward_ssh_ports(
def cleanup_ssh_ports( def cleanup_ssh_ports(
args: IntegrationConfig, args: IntegrationConfig,
ssh_connections: list[SshConnectionDetail], ssh_connections: list[SshConnectionDetail],
playbook: str, playbook: str,
target_state: dict[str, tuple[list[str], list[SshProcess]]], target_state: dict[str, tuple[list[str], list[SshProcess]]],
target: IntegrationTarget, target: IntegrationTarget,
host_type: str, host_type: str,
) -> None: ) -> None:
"""Stop previously configured SSH port forwarding and remove previously written hosts file entries.""" """Stop previously configured SSH port forwarding and remove previously written hosts file entries."""
state = target_state.pop(target.name, None) state = target_state.pop(target.name, None)

@ -115,10 +115,10 @@ class AnsibleCoreCI:
DEFAULT_ENDPOINT = 'https://ansible-core-ci.testing.ansible.com' DEFAULT_ENDPOINT = 'https://ansible-core-ci.testing.ansible.com'
def __init__( def __init__(
self, self,
args: EnvironmentConfig, args: EnvironmentConfig,
resource: Resource, resource: Resource,
load: bool = True, load: bool = True,
) -> None: ) -> None:
self.args = args self.args = args
self.resource = resource self.resource = resource

@ -143,14 +143,14 @@ def get_sqlite_schema_version(path: str) -> int:
def cover_python( def cover_python(
args: TestConfig, args: TestConfig,
python: PythonConfig, python: PythonConfig,
cmd: list[str], cmd: list[str],
target_name: str, target_name: str,
env: dict[str, str], env: dict[str, str],
capture: bool, capture: bool,
data: t.Optional[str] = None, data: t.Optional[str] = None,
cwd: t.Optional[str] = None, cwd: t.Optional[str] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run a command while collecting Python code coverage.""" """Run a command while collecting Python code coverage."""
if args.coverage: if args.coverage:
@ -176,9 +176,9 @@ def get_coverage_platform(config: HostConfig) -> str:
def get_coverage_environment( def get_coverage_environment(
args: TestConfig, args: TestConfig,
target_name: str, target_name: str,
version: str, version: str,
) -> dict[str, str]: ) -> dict[str, str]:
"""Return environment variables needed to collect code coverage.""" """Return environment variables needed to collect code coverage."""
# unit tests, sanity tests and other special cases (localhost only) # unit tests, sanity tests and other special cases (localhost only)

@ -266,12 +266,12 @@ def download_results(args: EnvironmentConfig, con: Connection, content_root: str
def generate_command( def generate_command(
args: EnvironmentConfig, args: EnvironmentConfig,
python: PythonConfig, python: PythonConfig,
ansible_bin_path: str, ansible_bin_path: str,
content_root: str, content_root: str,
exclude: list[str], exclude: list[str],
require: list[str], require: list[str],
) -> list[str]: ) -> list[str]:
"""Generate the command necessary to delegate ansible-test.""" """Generate the command necessary to delegate ansible-test."""
cmd = [os.path.join(ansible_bin_path, 'ansible-test')] cmd = [os.path.join(ansible_bin_path, 'ansible-test')]
@ -318,10 +318,10 @@ def generate_command(
def filter_options( def filter_options(
args: EnvironmentConfig, args: EnvironmentConfig,
argv: list[str], argv: list[str],
exclude: list[str], exclude: list[str],
require: list[str], require: list[str],
) -> c.Iterable[str]: ) -> c.Iterable[str]:
"""Return an iterable that filters out unwanted CLI options and injects new ones as requested.""" """Return an iterable that filters out unwanted CLI options and injects new ones as requested."""
replace: list[tuple[str, int, t.Optional[t.Union[bool, str, list[str]]]]] = [ replace: list[tuple[str, int, t.Optional[t.Union[bool, str, list[str]]]]] = [

@ -401,11 +401,11 @@ def detect_host_properties(args: CommonConfig) -> ContainerHostProperties:
def run_utility_container( def run_utility_container(
args: CommonConfig, args: CommonConfig,
name: str, name: str,
cmd: list[str], cmd: list[str],
options: list[str], options: list[str],
data: t.Optional[str] = None, data: t.Optional[str] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command using the ansible-test utility container, returning stdout and stderr.""" """Run the specified command using the ansible-test utility container, returning stdout and stderr."""
options = options + [ options = options + [
@ -670,30 +670,30 @@ def docker_cp_to(args: CommonConfig, container_id: str, src: str, dst: str) -> N
def docker_create( def docker_create(
args: CommonConfig, args: CommonConfig,
image: str, image: str,
options: list[str], options: list[str],
cmd: list[str] = None, cmd: list[str] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Create a container using the given docker image.""" """Create a container using the given docker image."""
return docker_command(args, ['create'] + options + [image] + cmd, capture=True) return docker_command(args, ['create'] + options + [image] + cmd, capture=True)
def docker_run( def docker_run(
args: CommonConfig, args: CommonConfig,
image: str, image: str,
options: list[str], options: list[str],
cmd: list[str] = None, cmd: list[str] = None,
data: t.Optional[str] = None, data: t.Optional[str] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run a container using the given docker image.""" """Run a container using the given docker image."""
return docker_command(args, ['run'] + options + [image] + cmd, data=data, capture=True) return docker_command(args, ['run'] + options + [image] + cmd, data=data, capture=True)
def docker_start( def docker_start(
args: CommonConfig, args: CommonConfig,
container_id: str, container_id: str,
options: list[str], options: list[str],
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Start a container by name or ID.""" """Start a container by name or ID."""
return docker_command(args, ['start'] + options + [container_id], capture=True) return docker_command(args, ['start'] + options + [container_id], capture=True)
@ -943,16 +943,16 @@ def docker_logs(args: CommonConfig, container_id: str) -> None:
def docker_exec( def docker_exec(
args: CommonConfig, args: CommonConfig,
container_id: str, container_id: str,
cmd: list[str], cmd: list[str],
capture: bool, capture: bool,
options: t.Optional[list[str]] = None, options: t.Optional[list[str]] = None,
stdin: t.Optional[t.IO[bytes]] = None, stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None, stdout: t.Optional[t.IO[bytes]] = None,
interactive: bool = False, interactive: bool = False,
output_stream: t.Optional[OutputStream] = None, output_stream: t.Optional[OutputStream] = None,
data: t.Optional[str] = None, data: t.Optional[str] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Execute the given command in the specified container.""" """Execute the given command in the specified container."""
if not options: if not options:
@ -966,15 +966,15 @@ def docker_exec(
def docker_command( def docker_command(
args: CommonConfig, args: CommonConfig,
cmd: list[str], cmd: list[str],
capture: bool, capture: bool,
stdin: t.Optional[t.IO[bytes]] = None, stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None, stdout: t.Optional[t.IO[bytes]] = None,
interactive: bool = False, interactive: bool = False,
output_stream: t.Optional[OutputStream] = None, output_stream: t.Optional[OutputStream] = None,
always: bool = False, always: bool = False,
data: t.Optional[str] = None, data: t.Optional[str] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified docker command.""" """Run the specified docker command."""
env = docker_environment() env = docker_environment()

@ -1411,9 +1411,9 @@ def get_config_profile_type_map() -> dict[t.Type[HostConfig], t.Type[HostProfile
def create_host_profile( def create_host_profile(
args: EnvironmentConfig, args: EnvironmentConfig,
config: HostConfig, config: HostConfig,
controller: bool, controller: bool,
) -> HostProfile: ) -> HostProfile:
"""Create and return a host profile from the given host configuration.""" """Create and return a host profile from the given host configuration."""
profile_type = get_config_profile_type_map()[type(config)] profile_type = get_config_profile_type_map()[type(config)]

@ -97,10 +97,10 @@ class HostState:
def prepare_profiles( def prepare_profiles(
args: TEnvironmentConfig, args: TEnvironmentConfig,
targets_use_pypi: bool = False, targets_use_pypi: bool = False,
skip_setup: bool = False, skip_setup: bool = False,
requirements: t.Optional[c.Callable[[HostProfile], None]] = None, requirements: t.Optional[c.Callable[[HostProfile], None]] = None,
) -> HostState: ) -> HostState:
""" """
Create new profiles, or load existing ones, and return them. Create new profiles, or load existing ones, and return them.

@ -122,14 +122,14 @@ class PipBootstrap(PipCommand):
def install_requirements( def install_requirements(
args: EnvironmentConfig, args: EnvironmentConfig,
python: PythonConfig, python: PythonConfig,
ansible: bool = False, ansible: bool = False,
command: bool = False, command: bool = False,
coverage: bool = False, coverage: bool = False,
virtualenv: bool = False, virtualenv: bool = False,
controller: bool = True, controller: bool = True,
connection: t.Optional[Connection] = None, connection: t.Optional[Connection] = None,
) -> None: ) -> None:
"""Install requirements for the given Python using the specified arguments.""" """Install requirements for the given Python using the specified arguments."""
create_result_directories(args) create_result_directories(args)
@ -197,15 +197,15 @@ def collect_bootstrap(python: PythonConfig) -> list[PipCommand]:
def collect_requirements( def collect_requirements(
python: PythonConfig, python: PythonConfig,
controller: bool, controller: bool,
ansible: bool, ansible: bool,
cryptography: bool, cryptography: bool,
coverage: bool, coverage: bool,
virtualenv: bool, virtualenv: bool,
minimize: bool, minimize: bool,
command: t.Optional[str], command: t.Optional[str],
sanity: t.Optional[str], sanity: t.Optional[str],
) -> list[PipCommand]: ) -> list[PipCommand]:
"""Collect requirements for the given Python using the specified arguments.""" """Collect requirements for the given Python using the specified arguments."""
commands: list[PipCommand] = [] commands: list[PipCommand] = []
@ -252,10 +252,10 @@ def collect_requirements(
def run_pip( def run_pip(
args: EnvironmentConfig, args: EnvironmentConfig,
python: PythonConfig, python: PythonConfig,
commands: list[PipCommand], commands: list[PipCommand],
connection: t.Optional[Connection], connection: t.Optional[Connection],
) -> None: ) -> None:
"""Run the specified pip commands for the given Python, and optionally the specified host.""" """Run the specified pip commands for the given Python, and optionally the specified host."""
connection = connection or LocalConnection(args) connection = connection or LocalConnection(args)
@ -367,10 +367,10 @@ def collect_integration_install(command: str, controller: bool) -> list[PipInsta
def collect_install( def collect_install(
requirements_paths: list[tuple[str, str]], requirements_paths: list[tuple[str, str]],
constraints_paths: list[tuple[str, str]], constraints_paths: list[tuple[str, str]],
packages: t.Optional[list[str]] = None, packages: t.Optional[list[str]] = None,
constraints: bool = True, constraints: bool = True,
) -> list[PipInstall]: ) -> list[PipInstall]:
"""Build a pip install list from the given requirements, constraints and packages.""" """Build a pip install list from the given requirements, constraints and packages."""
# listing content constraints first gives them priority over constraints provided by ansible-test # listing content constraints first gives them priority over constraints provided by ansible-test

@ -151,10 +151,10 @@ class SshProcess:
def create_ssh_command( def create_ssh_command(
ssh: SshConnectionDetail, ssh: SshConnectionDetail,
options: t.Optional[dict[str, t.Union[str, int]]] = None, options: t.Optional[dict[str, t.Union[str, int]]] = None,
cli_args: list[str] = None, cli_args: list[str] = None,
command: t.Optional[str] = None, command: t.Optional[str] = None,
) -> list[str]: ) -> list[str]:
"""Create an SSH command using the specified options.""" """Create an SSH command using the specified options."""
cmd = [ cmd = [
@ -207,11 +207,11 @@ def ssh_options_to_str(options: t.Union[dict[str, t.Union[int, str]], dict[str,
def run_ssh_command( def run_ssh_command(
args: EnvironmentConfig, args: EnvironmentConfig,
ssh: SshConnectionDetail, ssh: SshConnectionDetail,
options: t.Optional[dict[str, t.Union[str, int]]] = None, options: t.Optional[dict[str, t.Union[str, int]]] = None,
cli_args: list[str] = None, cli_args: list[str] = None,
command: t.Optional[str] = None, command: t.Optional[str] = None,
) -> SshProcess: ) -> SshProcess:
"""Run the specified SSH command, returning the created SshProcess instance created.""" """Run the specified SSH command, returning the created SshProcess instance created."""
cmd = create_ssh_command(ssh, options, cli_args, command) cmd = create_ssh_command(ssh, options, cli_args, command)
@ -233,9 +233,9 @@ def run_ssh_command(
def create_ssh_port_forwards( def create_ssh_port_forwards(
args: EnvironmentConfig, args: EnvironmentConfig,
ssh: SshConnectionDetail, ssh: SshConnectionDetail,
forwards: list[tuple[str, int]], forwards: list[tuple[str, int]],
) -> SshProcess: ) -> SshProcess:
""" """
Create SSH port forwards using the provided list of tuples (target_host, target_port). Create SSH port forwards using the provided list of tuples (target_host, target_port).
@ -257,9 +257,9 @@ def create_ssh_port_forwards(
def create_ssh_port_redirects( def create_ssh_port_redirects(
args: EnvironmentConfig, args: EnvironmentConfig,
ssh: SshConnectionDetail, ssh: SshConnectionDetail,
redirects: list[tuple[int, str, int]], redirects: list[tuple[int, str, int]],
) -> SshProcess: ) -> SshProcess:
"""Create SSH port redirections using the provided list of tuples (bind_port, target_host, target_port).""" """Create SSH port redirections using the provided list of tuples (bind_port, target_host, target_port)."""
options: dict[str, t.Union[str, int]] = {} options: dict[str, t.Union[str, int]] = {}

@ -65,10 +65,10 @@ def walk_completion_targets(targets: c.Iterable[CompletionTarget], prefix: str,
def walk_internal_targets( def walk_internal_targets(
targets: c.Iterable[TCompletionTarget], targets: c.Iterable[TCompletionTarget],
includes: t.Optional[list[str]] = None, includes: t.Optional[list[str]] = None,
excludes: t.Optional[list[str]] = None, excludes: t.Optional[list[str]] = None,
requires: t.Optional[list[str]] = None, requires: t.Optional[list[str]] = None,
) -> tuple[TCompletionTarget, ...]: ) -> tuple[TCompletionTarget, ...]:
"""Return a tuple of matching completion targets.""" """Return a tuple of matching completion targets."""
targets = tuple(targets) targets = tuple(targets)
@ -256,13 +256,13 @@ def load_integration_prefixes() -> dict[str, str]:
def walk_test_targets( def walk_test_targets(
path: t.Optional[str] = None, path: t.Optional[str] = None,
module_path: t.Optional[str] = None, module_path: t.Optional[str] = None,
extensions: t.Optional[tuple[str, ...]] = None, extensions: t.Optional[tuple[str, ...]] = None,
prefix: t.Optional[str] = None, prefix: t.Optional[str] = None,
extra_dirs: t.Optional[tuple[str, ...]] = None, extra_dirs: t.Optional[tuple[str, ...]] = None,
include_symlinks: bool = False, include_symlinks: bool = False,
include_symlinked_directories: bool = False, include_symlinked_directories: bool = False,
) -> c.Iterable[TestTarget]: ) -> c.Iterable[TestTarget]:
"""Iterate over available test targets.""" """Iterate over available test targets."""
if path: if path:
@ -436,12 +436,12 @@ class CompletionTarget(metaclass=abc.ABCMeta):
class TestTarget(CompletionTarget): class TestTarget(CompletionTarget):
"""Generic test target.""" """Generic test target."""
def __init__( def __init__(
self, self,
path: str, path: str,
module_path: t.Optional[str], module_path: t.Optional[str],
module_prefix: t.Optional[str], module_prefix: t.Optional[str],
base_path: str, base_path: str,
symlink: t.Optional[bool] = None, symlink: t.Optional[bool] = None,
) -> None: ) -> None:
super().__init__() super().__init__()

@ -215,12 +215,12 @@ class TestSkipped(TestResult):
class TestFailure(TestResult): class TestFailure(TestResult):
"""Test failure.""" """Test failure."""
def __init__( def __init__(
self, self,
command: str, command: str,
test: str, test: str,
python_version: t.Optional[str] = None, python_version: t.Optional[str] = None,
messages: t.Optional[c.Sequence[TestMessage]] = None, messages: t.Optional[c.Sequence[TestMessage]] = None,
summary: t.Optional[str] = None, summary: t.Optional[str] = None,
): ):
super().__init__(command, test, python_version) super().__init__(command, test, python_version)
@ -379,14 +379,14 @@ class TestFailure(TestResult):
class TestMessage: class TestMessage:
"""Single test message for one file.""" """Single test message for one file."""
def __init__( def __init__(
self, self,
message: str, message: str,
path: str, path: str,
line: int = 0, line: int = 0,
column: int = 0, column: int = 0,
level: str = 'error', level: str = 'error',
code: t.Optional[str] = None, code: t.Optional[str] = None,
confidence: t.Optional[int] = None, confidence: t.Optional[int] = None,
): ):
self.__path = path self.__path = path
self.__line = line self.__line = line

@ -345,19 +345,19 @@ def get_available_python_versions() -> dict[str, str]:
def raw_command( def raw_command(
cmd: c.Iterable[str], cmd: c.Iterable[str],
capture: bool, capture: bool,
env: t.Optional[dict[str, str]] = None, env: t.Optional[dict[str, str]] = None,
data: t.Optional[str] = None, data: t.Optional[str] = None,
cwd: t.Optional[str] = None, cwd: t.Optional[str] = None,
explain: bool = False, explain: bool = False,
stdin: t.Optional[t.Union[t.IO[bytes], int]] = None, stdin: t.Optional[t.Union[t.IO[bytes], int]] = None,
stdout: t.Optional[t.Union[t.IO[bytes], int]] = None, stdout: t.Optional[t.Union[t.IO[bytes], int]] = None,
interactive: bool = False, interactive: bool = False,
output_stream: t.Optional[OutputStream] = None, output_stream: t.Optional[OutputStream] = None,
cmd_verbosity: int = 1, cmd_verbosity: int = 1,
str_errors: str = 'strict', str_errors: str = 'strict',
error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None, error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return stdout and stderr as a tuple.""" """Run the specified command and return stdout and stderr as a tuple."""
output_stream = output_stream or OutputStream.AUTO output_stream = output_stream or OutputStream.AUTO
@ -496,12 +496,12 @@ def raw_command(
def communicate_with_process( def communicate_with_process(
process: subprocess.Popen, process: subprocess.Popen,
stdin: t.Optional[bytes], stdin: t.Optional[bytes],
stdout: bool, stdout: bool,
stderr: bool, stderr: bool,
capture: bool, capture: bool,
output_stream: OutputStream, output_stream: OutputStream,
) -> tuple[bytes, bytes]: ) -> tuple[bytes, bytes]:
"""Communicate with the specified process, handling stdin/stdout/stderr as requested.""" """Communicate with the specified process, handling stdin/stdout/stderr as requested."""
threads: list[WrappedThread] = [] threads: list[WrappedThread] = []
@ -855,11 +855,11 @@ class Display:
self.print_message(message, color=color, truncate=truncate) self.print_message(message, color=color, truncate=truncate)
def print_message( # pylint: disable=locally-disabled, invalid-name def print_message( # pylint: disable=locally-disabled, invalid-name
self, self,
message: str, message: str,
color: t.Optional[str] = None, color: t.Optional[str] = None,
stderr: bool = False, stderr: bool = False,
truncate: bool = False, truncate: bool = False,
) -> None: ) -> None:
"""Display a message.""" """Display a message."""
if self.redact and self.sensitive: if self.redact and self.sensitive:
@ -901,13 +901,13 @@ class ApplicationWarning(Exception):
class SubprocessError(ApplicationError): class SubprocessError(ApplicationError):
"""Error resulting from failed subprocess execution.""" """Error resulting from failed subprocess execution."""
def __init__( def __init__(
self, self,
cmd: list[str], cmd: list[str],
status: int = 0, status: int = 0,
stdout: t.Optional[str] = None, stdout: t.Optional[str] = None,
stderr: t.Optional[str] = None, stderr: t.Optional[str] = None,
runtime: t.Optional[float] = None, runtime: t.Optional[float] = None,
error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None, error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None,
) -> None: ) -> None:
message = 'Command "%s" returned exit status %s.\n' % (shlex.join(cmd), status) message = 'Command "%s" returned exit status %s.\n' % (shlex.join(cmd), status)

@ -396,14 +396,14 @@ def cleanup_python_paths() -> None:
def intercept_python( def intercept_python(
args: CommonConfig, args: CommonConfig,
python: PythonConfig, python: PythonConfig,
cmd: list[str], cmd: list[str],
env: dict[str, str], env: dict[str, str],
capture: bool, capture: bool,
data: t.Optional[str] = None, data: t.Optional[str] = None,
cwd: t.Optional[str] = None, cwd: t.Optional[str] = None,
always: bool = False, always: bool = False,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
""" """
Run a command while intercepting invocations of Python to control the version used. Run a command while intercepting invocations of Python to control the version used.
@ -428,20 +428,20 @@ def intercept_python(
def run_command( def run_command(
args: CommonConfig, args: CommonConfig,
cmd: c.Iterable[str], cmd: c.Iterable[str],
capture: bool, capture: bool,
env: t.Optional[dict[str, str]] = None, env: t.Optional[dict[str, str]] = None,
data: t.Optional[str] = None, data: t.Optional[str] = None,
cwd: t.Optional[str] = None, cwd: t.Optional[str] = None,
always: bool = False, always: bool = False,
stdin: t.Optional[t.IO[bytes]] = None, stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None, stdout: t.Optional[t.IO[bytes]] = None,
interactive: bool = False, interactive: bool = False,
output_stream: t.Optional[OutputStream] = None, output_stream: t.Optional[OutputStream] = None,
cmd_verbosity: int = 1, cmd_verbosity: int = 1,
str_errors: str = 'strict', str_errors: str = 'strict',
error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None, error_callback: t.Optional[c.Callable[[SubprocessError], None]] = None,
) -> tuple[t.Optional[str], t.Optional[str]]: ) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return stdout and stderr as a tuple.""" """Run the specified command and return stdout and stderr as a tuple."""
explain = args.explain and not always explain = args.explain and not always

@ -41,8 +41,8 @@ from .python_requirements import (
def get_virtual_python( def get_virtual_python(
args: EnvironmentConfig, args: EnvironmentConfig,
python: VirtualPythonConfig, python: VirtualPythonConfig,
) -> VirtualPythonConfig: ) -> VirtualPythonConfig:
"""Create a virtual environment for the given Python and return the path to its root.""" """Create a virtual environment for the given Python and return the path to its root."""
if python.system_site_packages: if python.system_site_packages:

Loading…
Cancel
Save