ansible-test - Improve code formatting (#79983)

* ansible-test - Add blank lines after docstrings

* ansible-test - Preserve formatting of arg pairs

* ansible-test - Remove unused string

* ansible-test - Remove pointless dict() usage

* ansible-test - Clean up initial func arg indenting

* ansible-test - Clean up constructor arg indenting

* ansible-test - Clean up func arg wrapping

* ansible-test - Clean up comma and paren placement
pull/79991/head
Matt Clay 1 year ago committed by GitHub
parent 43487c6581
commit 715ab99462
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,6 +10,7 @@ import datetime
import decimal
from xml.dom import minidom
# noinspection PyPep8Naming
from xml.etree import ElementTree as ET
@ -17,6 +18,7 @@ from xml.etree import ElementTree as ET
@dataclasses.dataclass # type: ignore[misc] # https://github.com/python/mypy/issues/5374
class TestResult(metaclass=abc.ABCMeta):
"""Base class for the result of a test case."""
output: str | None = None
message: str | None = None
type: str | None = None
@ -48,6 +50,7 @@ class TestResult(metaclass=abc.ABCMeta):
@dataclasses.dataclass
class TestFailure(TestResult):
"""Failure info for a test case."""
@property
def tag(self) -> str:
"""Tag name for the XML element created by this result type."""
@ -57,6 +60,7 @@ class TestFailure(TestResult):
@dataclasses.dataclass
class TestError(TestResult):
"""Error info for a test case."""
@property
def tag(self) -> str:
"""Tag name for the XML element created by this result type."""
@ -66,6 +70,7 @@ class TestError(TestResult):
@dataclasses.dataclass
class TestCase:
"""An individual test case."""
name: str
assertions: int | None = None
classname: str | None = None
@ -127,6 +132,7 @@ class TestCase:
@dataclasses.dataclass
class TestSuite:
"""A collection of test cases."""
name: str
hostname: str | None = None
id: str | None = None
@ -205,6 +211,7 @@ class TestSuite:
@dataclasses.dataclass
class TestSuites:
"""A collection of test suites."""
name: str | None = None
suites: list[TestSuite] = dataclasses.field(default_factory=list)

@ -114,27 +114,27 @@ def ansible_environment(args: CommonConfig, color: bool = True, ansible_config:
# standard path injection is not effective for ansible-connection, instead the location must be configured
# ansible-connection only requires the injector for code coverage
# the correct python interpreter is already selected using the sys.executable used to invoke ansible
ansible.update(dict(
ansible.update(
ANSIBLE_CONNECTION_PATH=os.path.join(get_injector_path(), 'ansible-connection'),
))
)
if isinstance(args, PosixIntegrationConfig):
ansible.update(dict(
ansible.update(
ANSIBLE_PYTHON_INTERPRETER='/set/ansible_python_interpreter/in/inventory', # force tests to set ansible_python_interpreter in inventory
))
)
env.update(ansible)
if args.debug:
env.update(dict(
env.update(
ANSIBLE_DEBUG='true',
ANSIBLE_LOG_PATH=os.path.join(ResultType.LOGS.name, 'debug.log'),
))
)
if data_context().content.collection:
env.update(dict(
env.update(
ANSIBLE_COLLECTIONS_PATH=data_context().content.collection.root,
))
)
if data_context().content.is_ansible:
env.update(configure_plugin_paths(args))
@ -252,12 +252,14 @@ License: GPLv3+
class CollectionDetail:
"""Collection detail."""
def __init__(self) -> None:
self.version: t.Optional[str] = None
class CollectionDetailError(ApplicationError):
"""An error occurred retrieving collection detail."""
def __init__(self, reason: str) -> None:
super().__init__('Error collecting collection detail: %s' % reason)
self.reason = reason

@ -11,6 +11,7 @@ from .util import (
class Become(metaclass=abc.ABCMeta):
"""Base class for become implementations."""
@classmethod
def name(cls) -> str:
"""The name of this plugin."""
@ -28,6 +29,7 @@ class Become(metaclass=abc.ABCMeta):
class Doas(Become):
"""Become using 'doas'."""
@property
def method(self) -> str:
"""The name of the Ansible become plugin that is equivalent to this."""
@ -47,6 +49,7 @@ class Doas(Become):
class DoasSudo(Doas):
"""Become using 'doas' in ansible-test and then after bootstrapping use 'sudo' for other ansible commands."""
@classmethod
def name(cls) -> str:
"""The name of this plugin."""
@ -60,6 +63,7 @@ class DoasSudo(Doas):
class Su(Become):
"""Become using 'su'."""
@property
def method(self) -> str:
"""The name of the Ansible become plugin that is equivalent to this."""
@ -77,6 +81,7 @@ class Su(Become):
class SuSudo(Su):
"""Become using 'su' in ansible-test and then after bootstrapping use 'sudo' for other ansible commands."""
@classmethod
def name(cls) -> str:
"""The name of this plugin."""
@ -90,6 +95,7 @@ class SuSudo(Su):
class Sudo(Become):
"""Become using 'sudo'."""
@property
def method(self) -> str:
"""The name of the Ansible become plugin that is equivalent to this."""

@ -26,6 +26,7 @@ from .core_ci import (
@dataclasses.dataclass
class Bootstrap:
"""Base class for bootstrapping systems."""
controller: bool
python_versions: list[str]
ssh_key: SshKey
@ -65,6 +66,7 @@ class Bootstrap:
@dataclasses.dataclass
class BootstrapDocker(Bootstrap):
"""Bootstrap docker instances."""
def get_variables(self) -> dict[str, t.Union[str, list[str]]]:
"""The variables to template in the bootstrapping script."""
variables = super().get_variables()
@ -80,6 +82,7 @@ class BootstrapDocker(Bootstrap):
@dataclasses.dataclass
class BootstrapRemote(Bootstrap):
"""Bootstrap remote instances."""
platform: str
platform_version: str

@ -13,6 +13,7 @@ TValue = t.TypeVar('TValue')
class CommonCache:
"""Common cache."""
def __init__(self, args: CommonConfig) -> None:
self.args = args

@ -9,6 +9,7 @@ import re
class CGroupPath:
"""Linux cgroup path constants."""
ROOT = '/sys/fs/cgroup'
SYSTEMD = '/sys/fs/cgroup/systemd'
SYSTEMD_RELEASE_AGENT = '/sys/fs/cgroup/systemd/release_agent'
@ -16,6 +17,7 @@ class CGroupPath:
class MountType:
"""Linux filesystem mount type constants."""
TMPFS = 'tmpfs'
CGROUP_V1 = 'cgroup'
CGROUP_V2 = 'cgroup2'
@ -24,6 +26,7 @@ class MountType:
@dataclasses.dataclass(frozen=True)
class CGroupEntry:
"""A single cgroup entry parsed from '/proc/{pid}/cgroup' in the proc filesystem."""
id: int
subsystem: str
path: pathlib.PurePosixPath
@ -46,7 +49,7 @@ class CGroupEntry:
return cls(
id=int(cid),
subsystem=subsystem.removeprefix('name='),
path=pathlib.PurePosixPath(path)
path=pathlib.PurePosixPath(path),
)
@classmethod
@ -58,6 +61,7 @@ class CGroupEntry:
@dataclasses.dataclass(frozen=True)
class MountEntry:
"""A single mount info entry parsed from '/proc/{pid}/mountinfo' in the proc filesystem."""
mount_id: int
parent_id: int
device_major: int

@ -39,6 +39,7 @@ class ChangeDetectionNotSupported(ApplicationError):
class CIProvider(metaclass=abc.ABCMeta):
"""Base class for CI provider plugins."""
priority = 500
@staticmethod
@ -103,6 +104,7 @@ def get_ci_provider() -> CIProvider:
class AuthHelper(metaclass=abc.ABCMeta):
"""Public key based authentication helper for Ansible Core CI."""
def sign_request(self, request: dict[str, t.Any]) -> None:
"""Sign the given auth request and make the public key available."""
payload_bytes = to_bytes(json.dumps(request, sort_keys=True))
@ -141,6 +143,7 @@ class AuthHelper(metaclass=abc.ABCMeta):
class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
"""Cryptography based public key based authentication helper for Ansible Core CI."""
def sign_bytes(self, payload_bytes: bytes) -> bytes:
"""Sign the given payload and return the signature, initializing a new key pair if required."""
# import cryptography here to avoid overhead and failures in environments which do not use/provide it
@ -186,6 +189,7 @@ class CryptographyAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
class OpenSSLAuthHelper(AuthHelper, metaclass=abc.ABCMeta):
"""OpenSSL based public key based authentication helper for Ansible Core CI."""
def sign_bytes(self, payload_bytes: bytes) -> bytes:
"""Sign the given payload and return the signature, initializing a new key pair if required."""
private_key_pem = self.initialize_private_key()

@ -40,6 +40,7 @@ CODE = 'azp'
class AzurePipelines(CIProvider):
"""CI provider implementation for Azure Pipelines."""
def __init__(self) -> None:
self.auth = AzurePipelinesAuthHelper()
@ -143,6 +144,7 @@ class AzurePipelinesAuthHelper(CryptographyAuthHelper):
Authentication helper for Azure Pipelines.
Based on cryptography since it is provided by the default Azure Pipelines environment.
"""
def publish_public_key(self, public_key_pem: str) -> None:
"""Publish the given public key."""
try:
@ -162,6 +164,7 @@ class AzurePipelinesAuthHelper(CryptographyAuthHelper):
class AzurePipelinesChanges:
"""Change information for an Azure Pipelines build."""
def __init__(self, args: CommonConfig) -> None:
self.args = args
self.git = Git()

@ -36,6 +36,7 @@ CODE = '' # not really a CI provider, so use an empty string for the code
class Local(CIProvider):
"""CI provider implementation when not using CI."""
priority = 1000
@staticmethod
@ -149,6 +150,7 @@ class Local(CIProvider):
class InvalidBranch(ApplicationError):
"""Exception for invalid branch specification."""
def __init__(self, branch: str, reason: str) -> None:
message = 'Invalid branch: %s\n%s' % (branch, reason)
@ -159,6 +161,7 @@ class InvalidBranch(ApplicationError):
class LocalChanges:
"""Change information for local work."""
def __init__(self, args: TestConfig) -> None:
self.args = args
self.git = Git()

@ -176,6 +176,7 @@ def categorize_changes(args: TestConfig, paths: list[str], verbose_command: t.Op
class PathMapper:
"""Map file paths to test commands and targets."""
def __init__(self, args: TestConfig) -> None:
self.args = args
self.integration_all_target = get_integration_all_target(self.args)

@ -231,6 +231,7 @@ def relative_to_absolute(name: str, level: int, module: str, path: str, lineno:
class ModuleUtilFinder(ast.NodeVisitor):
"""AST visitor to find valid module_utils imports."""
def __init__(self, path: str, module_utils: set[str]) -> None:
self.path = path
self.module_utils = module_utils

@ -22,6 +22,7 @@ from .parsers import (
class OriginControllerAction(CompositeAction):
"""Composite action parser for the controller when the only option is `origin`."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return OriginControllerParser()
@ -29,6 +30,7 @@ class OriginControllerAction(CompositeAction):
class DelegatedControllerAction(CompositeAction):
"""Composite action parser for the controller when delegation is supported."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return DelegatedControllerParser()
@ -36,6 +38,7 @@ class DelegatedControllerAction(CompositeAction):
class PosixTargetAction(CompositeAction):
"""Composite action parser for a POSIX target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return PosixTargetParser()
@ -43,6 +46,7 @@ class PosixTargetAction(CompositeAction):
class WindowsTargetAction(CompositeAction):
"""Composite action parser for a Windows target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return WindowsTargetParser()
@ -50,6 +54,7 @@ class WindowsTargetAction(CompositeAction):
class NetworkTargetAction(CompositeAction):
"""Composite action parser for a network target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return NetworkTargetParser()
@ -57,6 +62,7 @@ class NetworkTargetAction(CompositeAction):
class SanityPythonTargetAction(CompositeAction):
"""Composite action parser for a sanity target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return SanityPythonTargetParser()
@ -64,6 +70,7 @@ class SanityPythonTargetAction(CompositeAction):
class UnitsPythonTargetAction(CompositeAction):
"""Composite action parser for a units target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return UnitsPythonTargetParser()
@ -71,6 +78,7 @@ class UnitsPythonTargetAction(CompositeAction):
class PosixSshTargetAction(CompositeAction):
"""Composite action parser for a POSIX SSH target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return PosixSshTargetParser()
@ -78,6 +86,7 @@ class PosixSshTargetAction(CompositeAction):
class WindowsSshTargetAction(CompositeAction):
"""Composite action parser for a Windows SSH target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return WindowsSshTargetParser()
@ -85,6 +94,7 @@ class WindowsSshTargetAction(CompositeAction):
class NetworkSshTargetAction(CompositeAction):
"""Composite action parser for a network SSH target."""
def create_parser(self) -> NamespaceParser:
"""Return a namespace parser to parse the argument associated with this action."""
return NetworkSshTargetParser()

@ -34,6 +34,7 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
These registered completions, if provided, are used to filter the final completion results.
This works around a known bug: https://github.com/kislyuk/argcomplete/issues/221
"""
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
@ -86,6 +87,7 @@ class RegisteredCompletionFinder(OptionCompletionFinder):
class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
"""Base class for actions that parse composite arguments."""
documentation_state: dict[t.Type[CompositeAction], DocumentationState] = {}
def __init__(
@ -134,6 +136,7 @@ class CompositeAction(argparse.Action, metaclass=abc.ABCMeta):
class CompositeActionCompletionFinder(RegisteredCompletionFinder):
"""Completion finder with support for composite argument parsing."""
def get_completions(
self,
prefix: str,
@ -255,7 +258,7 @@ def complete(
list_mode=True, # abuse list mode to enable preservation of the literal results
consumed='',
continuation='',
matches=['completion', 'invalid']
matches=['completion', 'invalid'],
)
else:
answer = ex

@ -8,6 +8,7 @@ import typing as t
class EnumAction(argparse.Action):
"""Parse an enum using the lowercase enum names."""
def __init__(self, **kwargs: t.Any) -> None:
self.enum_type: t.Type[enum.Enum] = kwargs.pop('type', None)
kwargs.setdefault('choices', tuple(e.name.lower() for e in self.enum_type))

@ -9,6 +9,7 @@ import typing as t
class Substitute:
"""Substitute for missing class which accepts all arguments."""
def __init__(self, *args, **kwargs) -> None:
pass
@ -35,6 +36,7 @@ class CompType(enum.Enum):
Bash COMP_TYPE argument completion types.
For documentation, see: https://www.gnu.org/software/bash/manual/html_node/Bash-Variables.html#index-COMP_005fTYPE
"""
COMPLETION = '\t'
"""
Standard completion, typically triggered by a single tab.
@ -85,6 +87,7 @@ class OptionCompletionFinder(CompletionFinder):
Custom completion finder for argcomplete.
It provides support for running completion in list mode, which argcomplete natively handles the same as standard completion.
"""
enabled = bool(argcomplete)
def __init__(self, *args, validator=None, **kwargs) -> None:

@ -32,18 +32,21 @@ class Completion(Exception):
@dataclasses.dataclass
class CompletionUnavailable(Completion):
"""Argument completion unavailable."""
message: str = 'No completions available.'
@dataclasses.dataclass
class CompletionError(Completion):
"""Argument completion error."""
message: t.Optional[str] = None
@dataclasses.dataclass
class CompletionSuccess(Completion):
"""Successful argument completion result."""
list_mode: bool
consumed: str
continuation: str
@ -72,6 +75,7 @@ class CompletionSuccess(Completion):
class ParserMode(enum.Enum):
"""Mode the parser is operating in."""
PARSE = enum.auto()
COMPLETE = enum.auto()
LIST = enum.auto()
@ -84,6 +88,7 @@ class ParserError(Exception):
@dataclasses.dataclass
class ParserBoundary:
"""Boundary details for parsing composite input."""
delimiters: str
required: bool
match: t.Optional[str] = None
@ -93,6 +98,7 @@ class ParserBoundary:
@dataclasses.dataclass
class ParserState:
"""State of the composite argument parser."""
mode: ParserMode
remainder: str = ''
consumed: str = ''
@ -194,11 +200,13 @@ class ParserState:
@dataclasses.dataclass
class DocumentationState:
"""State of the composite argument parser's generated documentation."""
sections: dict[str, str] = dataclasses.field(default_factory=dict)
class Parser(metaclass=abc.ABCMeta):
"""Base class for all composite argument parsers."""
@abc.abstractmethod
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
@ -210,6 +218,7 @@ class Parser(metaclass=abc.ABCMeta):
class MatchConditions(enum.Flag):
"""Acceptable condition(s) for matching user input to available choices."""
CHOICE = enum.auto()
"""Match any choice."""
ANY = enum.auto()
@ -220,6 +229,7 @@ class MatchConditions(enum.Flag):
class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers which use a list of choices that can be generated during completion."""
def __init__(self, conditions: MatchConditions = MatchConditions.CHOICE) -> None:
self.conditions = conditions
@ -275,6 +285,7 @@ class DynamicChoicesParser(Parser, metaclass=abc.ABCMeta):
class ChoicesParser(DynamicChoicesParser):
"""Composite argument parser which relies on a static list of choices."""
def __init__(self, choices: list[str], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
self.choices = choices
@ -291,6 +302,7 @@ class ChoicesParser(DynamicChoicesParser):
class EnumValueChoicesParser(ChoicesParser):
"""Composite argument parser which relies on a static list of choices derived from the values of an enum."""
def __init__(self, enum_type: t.Type[enum.Enum], conditions: MatchConditions = MatchConditions.CHOICE) -> None:
self.enum_type = enum_type
@ -304,6 +316,7 @@ class EnumValueChoicesParser(ChoicesParser):
class IntegerParser(DynamicChoicesParser):
"""Composite argument parser for integers."""
PATTERN = re.compile('^[1-9][0-9]*$')
def __init__(self, maximum: t.Optional[int] = None) -> None:
@ -341,6 +354,7 @@ class IntegerParser(DynamicChoicesParser):
class BooleanParser(ChoicesParser):
"""Composite argument parser for boolean (yes/no) values."""
def __init__(self) -> None:
super().__init__(['yes', 'no'])
@ -352,6 +366,7 @@ class BooleanParser(ChoicesParser):
class AnyParser(ChoicesParser):
"""Composite argument parser which accepts any input value."""
def __init__(self, nothing: bool = False, no_match_message: t.Optional[str] = None) -> None:
self.no_match_message = no_match_message
@ -379,6 +394,7 @@ class AnyParser(ChoicesParser):
class RelativePathNameParser(DynamicChoicesParser):
"""Composite argument parser for relative path names."""
RELATIVE_NAMES = ['.', '..']
def __init__(self, choices: list[str]) -> None:
@ -400,6 +416,7 @@ class RelativePathNameParser(DynamicChoicesParser):
class FileParser(Parser):
"""Composite argument parser for absolute or relative file paths."""
def parse(self, state: ParserState) -> str:
"""Parse the input from the given state and return the result."""
if state.mode == ParserMode.PARSE:
@ -432,6 +449,7 @@ class FileParser(Parser):
class AbsolutePathParser(Parser):
"""Composite argument parser for absolute file paths. Paths are only verified for proper syntax, not for existence."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
path = ''
@ -443,13 +461,14 @@ class AbsolutePathParser(Parser):
else:
path += ChoicesParser([PATH_DELIMITER]).parse(state)
path += (boundary.match or '')
path += boundary.match or ''
return path
class NamespaceParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers that store their results in a namespace."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
namespace = state.current_namespace
@ -496,6 +515,7 @@ class NamespaceParser(Parser, metaclass=abc.ABCMeta):
class NamespaceWrappedParser(NamespaceParser):
"""Composite argument parser that wraps a non-namespace parser and stores the result in a namespace."""
def __init__(self, dest: str, parser: Parser) -> None:
self._dest = dest
self.parser = parser
@ -512,6 +532,7 @@ class NamespaceWrappedParser(NamespaceParser):
class KeyValueParser(Parser, metaclass=abc.ABCMeta):
"""Base class for key/value composite argument parsers."""
@abc.abstractmethod
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
@ -538,6 +559,7 @@ class KeyValueParser(Parser, metaclass=abc.ABCMeta):
class PairParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers consisting of a left and right argument parser, with input separated by a delimiter."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
namespace = self.create_namespace()
@ -577,6 +599,7 @@ class PairParser(Parser, metaclass=abc.ABCMeta):
class TypeParser(Parser, metaclass=abc.ABCMeta):
"""Base class for composite argument parsers which parse a type name, a colon and then parse results based on the type given by the type name."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]: # pylint: disable=unused-argument
"""Return a dictionary of type names and type parsers."""
return self.get_stateless_parsers()

@ -50,7 +50,8 @@ def do_network_integration(
parser.set_defaults(
func=command_network_integration,
targets_func=walk_network_integration_targets,
config=NetworkIntegrationConfig)
config=NetworkIntegrationConfig,
)
network_integration = t.cast(argparse.ArgumentParser, parser.add_argument_group(title='network integration test arguments'))

@ -43,7 +43,8 @@ def do_sanity(
parser.set_defaults(
func=command_sanity,
targets_func=walk_sanity_targets,
config=SanityConfig)
config=SanityConfig,
)
sanity = parser.add_argument_group(title='sanity test arguments')
@ -113,7 +114,7 @@ def do_sanity(
sanity.add_argument(
'--prime-venvs',
action='store_true',
help='prepare virtual environments without running tests'
help='prepare virtual environments without running tests',
)
add_environments(parser, completer, ControllerMode.DELEGATED, TargetMode.SANITY) # sanity

@ -84,24 +84,28 @@ def get_option_name(name: str) -> str:
class PythonVersionUnsupportedError(ApplicationError):
"""A Python version was requested for a context which does not support that version."""
def __init__(self, context: str, version: str, versions: c.Iterable[str]) -> None:
super().__init__(f'Python {version} is not supported by environment `{context}`. Supported Python version(s) are: {", ".join(versions)}')
class PythonVersionUnspecifiedError(ApplicationError):
"""A Python version was not specified for a context which is unknown, thus the Python version is unknown."""
def __init__(self, context: str) -> None:
super().__init__(f'A Python version was not specified for environment `{context}`. Use the `--python` option to specify a Python version.')
class ControllerNotSupportedError(ApplicationError):
"""Option(s) were specified which do not provide support for the controller and would be ignored because they are irrelevant for the target."""
def __init__(self, context: str) -> None:
super().__init__(f'Environment `{context}` does not provide a Python version supported by the controller.')
class OptionsConflictError(ApplicationError):
"""Option(s) were specified which conflict with other options."""
def __init__(self, first: c.Iterable[str], second: c.Iterable[str]) -> None:
super().__init__(f'Options `{" ".join(first)}` cannot be combined with options `{" ".join(second)}`.')
@ -109,6 +113,7 @@ class OptionsConflictError(ApplicationError):
@dataclasses.dataclass(frozen=True)
class LegacyHostOptions:
"""Legacy host options used prior to the availability of separate controller and target host configuration."""
python: t.Optional[str] = None
python_interpreter: t.Optional[str] = None
local: t.Optional[bool] = None
@ -161,6 +166,7 @@ class LegacyHostOptions:
class TargetMode(enum.Enum):
"""Type of provisioning to use for the targets."""
WINDOWS_INTEGRATION = enum.auto() # windows-integration
NETWORK_INTEGRATION = enum.auto() # network-integration
POSIX_INTEGRATION = enum.auto() # integration

@ -75,6 +75,7 @@ from ..ci import (
class ControllerMode(enum.Enum):
"""Type of provisioning to use for the controller."""
NO_DELEGATION = enum.auto()
ORIGIN = enum.auto()
DELEGATED = enum.auto()
@ -252,7 +253,8 @@ def add_legacy_environment_options(
):
"""Add legacy options for controlling the test environment."""
environment: argparse.ArgumentParser = parser.add_argument_group( # type: ignore[assignment] # real type private
title='environment arguments (mutually exclusive with "composite environment arguments" below)')
title='environment arguments (mutually exclusive with "composite environment arguments" below)',
)
add_environments_python(environment, target_mode)
add_environments_host(environment, controller_mode, target_mode)
@ -383,7 +385,8 @@ def add_environment_venv(
environments_parser.add_argument(
'--venv-system-site-packages',
action='store_true',
help='enable system site packages')
help='enable system site packages',
)
def add_global_docker(

@ -53,6 +53,7 @@ from .base_argument_parsers import (
class OriginControllerParser(ControllerNamespaceParser, TypeParser):
"""Composite argument parser for the controller when delegation is not supported."""
def get_stateless_parsers(self) -> dict[str, Parser]:
"""Return a dictionary of type names and type parsers."""
return dict(
@ -71,6 +72,7 @@ class OriginControllerParser(ControllerNamespaceParser, TypeParser):
class DelegatedControllerParser(ControllerNamespaceParser, TypeParser):
"""Composite argument parser for the controller when delegation is supported."""
def get_stateless_parsers(self) -> dict[str, Parser]:
"""Return a dictionary of type names and type parsers."""
parsers: dict[str, Parser] = dict(
@ -97,6 +99,7 @@ class DelegatedControllerParser(ControllerNamespaceParser, TypeParser):
class PosixTargetParser(TargetNamespaceParser, TypeParser):
"""Composite argument parser for a POSIX target."""
def get_stateless_parsers(self) -> dict[str, Parser]:
"""Return a dictionary of type names and type parsers."""
parsers: dict[str, Parser] = dict(
@ -127,6 +130,7 @@ class PosixTargetParser(TargetNamespaceParser, TypeParser):
class WindowsTargetParser(TargetsNamespaceParser, TypeParser):
"""Composite argument parser for a Windows target."""
@property
def allow_inventory(self) -> bool:
"""True if inventory is allowed, otherwise False."""
@ -169,6 +173,7 @@ class WindowsTargetParser(TargetsNamespaceParser, TypeParser):
class NetworkTargetParser(TargetsNamespaceParser, TypeParser):
"""Composite argument parser for a network target."""
@property
def allow_inventory(self) -> bool:
"""True if inventory is allowed, otherwise False."""
@ -211,6 +216,7 @@ class NetworkTargetParser(TargetsNamespaceParser, TypeParser):
class PythonTargetParser(TargetsNamespaceParser, Parser):
"""Composite argument parser for a Python target."""
def __init__(self, allow_venv: bool) -> None:
super().__init__()
@ -249,18 +255,21 @@ class PythonTargetParser(TargetsNamespaceParser, Parser):
class SanityPythonTargetParser(PythonTargetParser):
"""Composite argument parser for a sanity Python target."""
def __init__(self) -> None:
super().__init__(allow_venv=False)
class UnitsPythonTargetParser(PythonTargetParser):
"""Composite argument parser for a units Python target."""
def __init__(self) -> None:
super().__init__(allow_venv=True)
class PosixSshTargetParser(PosixTargetParser):
"""Composite argument parser for a POSIX SSH target."""
@property
def option_name(self) -> str:
"""The option name used for this parser."""
@ -269,6 +278,7 @@ class PosixSshTargetParser(PosixTargetParser):
class WindowsSshTargetParser(WindowsTargetParser):
"""Composite argument parser for a Windows SSH target."""
@property
def option_name(self) -> str:
"""The option name used for this parser."""
@ -287,6 +297,7 @@ class WindowsSshTargetParser(WindowsTargetParser):
class NetworkSshTargetParser(NetworkTargetParser):
"""Composite argument parser for a network SSH target."""
@property
def option_name(self) -> str:
"""The option name used for this parser."""

@ -13,6 +13,7 @@ from ..argparsing.parsers import (
class ControllerNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
"""Base class for controller namespace parsers."""
@property
def dest(self) -> str:
"""The name of the attribute where the value should be stored."""
@ -28,6 +29,7 @@ class ControllerNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
class TargetNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
"""Base class for target namespace parsers involving a single target."""
@property
def option_name(self) -> str:
"""The option name used for this parser."""
@ -51,6 +53,7 @@ class TargetNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
class TargetsNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
"""Base class for controller namespace parsers involving multiple targets."""
@property
def option_name(self) -> str:
"""The option name used for this parser."""
@ -69,5 +72,6 @@ class TargetsNamespaceParser(NamespaceParser, metaclass=abc.ABCMeta):
class ControllerRequiredFirstError(CompletionError):
"""Exception raised when controller and target options are specified out-of-order."""
def __init__(self) -> None:
super().__init__('The `--controller` option must be specified before `--target` option(s).')

@ -63,6 +63,7 @@ from .helpers import (
class OriginParser(Parser):
"""Composite argument parser for the origin."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
namespace = OriginConfig()
@ -81,6 +82,7 @@ class OriginParser(Parser):
class ControllerParser(Parser):
"""Composite argument parser for the controller."""
def parse(self, state: ParserState) -> t.Any:
"""Parse the input from the given state and return the result."""
namespace = ControllerConfig()
@ -99,6 +101,7 @@ class ControllerParser(Parser):
class DockerParser(PairParser):
"""Composite argument parser for a docker host."""
def __init__(self, controller: bool) -> None:
self.controller = controller
@ -142,6 +145,7 @@ class DockerParser(PairParser):
class PosixRemoteParser(PairParser):
"""Composite argument parser for a POSIX remote host."""
def __init__(self, controller: bool) -> None:
self.controller = controller
@ -184,6 +188,7 @@ class PosixRemoteParser(PairParser):
class WindowsRemoteParser(PairParser):
"""Composite argument parser for a Windows remote host."""
def create_namespace(self) -> t.Any:
"""Create and return a namespace."""
return WindowsRemoteConfig()
@ -217,6 +222,7 @@ class WindowsRemoteParser(PairParser):
class NetworkRemoteParser(PairParser):
"""Composite argument parser for a network remote host."""
def create_namespace(self) -> t.Any:
"""Create and return a namespace."""
return NetworkRemoteConfig()
@ -250,6 +256,7 @@ class NetworkRemoteParser(PairParser):
class WindowsInventoryParser(PairParser):
"""Composite argument parser for a Windows inventory."""
def create_namespace(self) -> t.Any:
"""Create and return a namespace."""
return WindowsInventoryConfig()
@ -269,6 +276,7 @@ class WindowsInventoryParser(PairParser):
class NetworkInventoryParser(PairParser):
"""Composite argument parser for a network inventory."""
def create_namespace(self) -> t.Any:
"""Create and return a namespace."""
return NetworkInventoryConfig()
@ -288,6 +296,7 @@ class NetworkInventoryParser(PairParser):
class PosixSshParser(PairParser):
"""Composite argument parser for a POSIX SSH host."""
def create_namespace(self) -> t.Any:
"""Create and return a namespace."""
return PosixSshConfig()

@ -52,6 +52,7 @@ from .helpers import (
class OriginKeyValueParser(KeyValueParser):
"""Composite argument parser for origin key/value pairs."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
versions = CONTROLLER_PYTHON_VERSIONS
@ -75,6 +76,7 @@ class OriginKeyValueParser(KeyValueParser):
class ControllerKeyValueParser(KeyValueParser):
"""Composite argument parser for controller key/value pairs."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
versions = get_controller_pythons(state.root_namespace.controller, False)
@ -99,6 +101,7 @@ class ControllerKeyValueParser(KeyValueParser):
class DockerKeyValueParser(KeyValueParser):
"""Composite argument parser for docker key/value pairs."""
def __init__(self, image: str, controller: bool) -> None:
self.controller = controller
self.versions = get_docker_pythons(image, controller, False)
@ -135,6 +138,7 @@ class DockerKeyValueParser(KeyValueParser):
class PosixRemoteKeyValueParser(KeyValueParser):
"""Composite argument parser for POSIX remote key/value pairs."""
def __init__(self, name: str, controller: bool) -> None:
self.controller = controller
self.versions = get_remote_pythons(name, controller, False)
@ -167,6 +171,7 @@ class PosixRemoteKeyValueParser(KeyValueParser):
class WindowsRemoteKeyValueParser(KeyValueParser):
"""Composite argument parser for Windows remote key/value pairs."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
return dict(
@ -188,6 +193,7 @@ class WindowsRemoteKeyValueParser(KeyValueParser):
class NetworkRemoteKeyValueParser(KeyValueParser):
"""Composite argument parser for network remote key/value pairs."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
return dict(
@ -213,6 +219,7 @@ class NetworkRemoteKeyValueParser(KeyValueParser):
class PosixSshKeyValueParser(KeyValueParser):
"""Composite argument parser for POSIX SSH host key/value pairs."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
return dict(
@ -234,6 +241,7 @@ class PosixSshKeyValueParser(KeyValueParser):
class EmptyKeyValueParser(KeyValueParser):
"""Composite argument parser when a key/value parser is required but there are no keys available."""
def get_parsers(self, state: ParserState) -> dict[str, Parser]:
"""Return a dictionary of key names and value parsers."""
return {}

@ -60,12 +60,14 @@ class PythonParser(Parser):
Known docker/remote environments limit the available Python versions to configured values known to be valid.
The origin host and unknown environments assume all relevant Python versions are available.
"""
def __init__(self,
versions: c.Sequence[str],
*,
allow_default: bool,
allow_venv: bool,
):
def __init__(
self,
versions: c.Sequence[str],
*,
allow_default: bool,
allow_venv: bool,
):
version_choices = list(versions)
if allow_default:
@ -134,6 +136,7 @@ class PythonParser(Parser):
class PlatformParser(ChoicesParser):
"""Composite argument parser for "{platform}/{version}" formatted choices."""
def __init__(self, choices: list[str]) -> None:
super().__init__(choices, conditions=MatchConditions.CHOICE | MatchConditions.ANY)
@ -152,6 +155,7 @@ class SshConnectionParser(Parser):
Composite argument parser for connecting to a host using SSH.
Format: user@host[:port]
"""
EXPECTED_FORMAT = '{user}@{host}[:{port}]'
def parse(self, state: ParserState) -> t.Any:

@ -68,6 +68,7 @@ COVERAGE_OUTPUT_FILE_NAME = 'coverage'
class CoverageConfig(EnvironmentConfig):
"""Configuration for the coverage command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'coverage')
@ -96,7 +97,7 @@ def initialize_coverage(args: CoverageConfig, host_state: HostState) -> coverage
def run_coverage(args: CoverageConfig, host_state: HostState, output_file: str, command: str, cmd: list[str]) -> None:
"""Run the coverage cli tool with the specified options."""
env = common_environment()
env.update(dict(COVERAGE_FILE=output_file))
env.update(COVERAGE_FILE=output_file)
cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd
@ -340,6 +341,7 @@ def sanitize_filename(
class PathChecker:
"""Checks code coverage paths to verify they are valid and reports on the findings."""
def __init__(self, args: CoverageConfig, collection_search_re: t.Optional[t.Pattern] = None) -> None:
self.args = args
self.collection_search_re = collection_search_re

@ -9,6 +9,7 @@ from .. import (
class CoverageAnalyzeConfig(CoverageConfig):
"""Configuration for the `coverage analyze` command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -28,6 +28,7 @@ from . import (
class CoverageAnalyzeTargetsCombineConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets combine` command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -26,6 +26,7 @@ from . import (
class CoverageAnalyzeTargetsExpandConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets expand` command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -31,6 +31,7 @@ from . import (
class CoverageAnalyzeTargetsFilterConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets filter` command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -52,6 +52,7 @@ from . import (
class CoverageAnalyzeTargetsGenerateConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets generate` command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -32,6 +32,7 @@ from . import (
class CoverageAnalyzeTargetsMissingConfig(CoverageAnalyzeTargetsConfig):
"""Configuration for the `coverage analyze targets missing` command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -103,11 +103,13 @@ def combine_coverage_files(args: CoverageCombineConfig, host_state: HostState) -
class ExportedCoverageDataNotFound(ApplicationError):
"""Exception when no combined coverage data is present yet is required."""
def __init__(self) -> None:
super().__init__(
'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n'
'Export coverage with `ansible-test coverage combine` using the `--export` option.\n'
'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path)
'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path
)
def _command_coverage_combine_python(args: CoverageCombineConfig, host_state: HostState) -> list[str]:
@ -353,6 +355,7 @@ def get_coverage_group(args: CoverageCombineConfig, coverage_file: str) -> t.Opt
class CoverageCombineConfig(CoverageConfig):
"""Configuration for the coverage combine command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -144,6 +144,7 @@ def _generate_powershell_output_report(args: CoverageReportConfig, coverage_file
class CoverageReportConfig(CoverageCombineConfig):
"""Configuration for the coverage report command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args)

@ -314,7 +314,7 @@ def integration_test_environment(
directory_copies = [
(
os.path.join(integration_targets_relative_path, target.relative_path),
os.path.join(temp_dir, integration_targets_relative_path, target.relative_path)
os.path.join(temp_dir, integration_targets_relative_path, target.relative_path),
)
for target in target_dependencies
]
@ -357,12 +357,12 @@ def integration_test_config_file(
config_vars = (env_config.ansible_vars or {}).copy()
config_vars.update(dict(
config_vars.update(
ansible_test=dict(
environment=env_config.env_vars,
module_defaults=env_config.module_defaults,
)
))
)
config_file = json.dumps(config_vars, indent=4, sort_keys=True)
@ -615,10 +615,10 @@ def command_integration_script(
env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env)
cwd = os.path.join(test_env.targets_dir, target.relative_path)
env.update(dict(
env.update(
# support use of adhoc ansible commands in collections without specifying the fully qualified collection name
ANSIBLE_PLAYBOOK_DIR=cwd,
))
)
if env_config and env_config.env_vars:
env.update(env_config.env_vars)
@ -653,9 +653,9 @@ def command_integration_role(
if isinstance(args, WindowsIntegrationConfig):
hosts = 'windows'
gather_facts = False
variables.update(dict(
variables.update(
win_output_dir=r'C:\ansible_testing',
))
)
elif isinstance(args, NetworkIntegrationConfig):
hosts = target.network_platform
gather_facts = False
@ -700,10 +700,10 @@ def command_integration_role(
if env_config.ansible_vars:
variables.update(env_config.ansible_vars)
play.update(dict(
play.update(
environment=env_config.env_vars,
module_defaults=env_config.module_defaults,
))
)
playbook = json.dumps([play], indent=4, sort_keys=True)
@ -736,10 +736,10 @@ def command_integration_role(
env = integration_environment(args, target, test_dir, test_env.inventory_path, test_env.ansible_config, env_config, test_env)
cwd = test_env.integration_dir
env.update(dict(
env.update(
# support use of adhoc ansible commands in collections without specifying the fully qualified collection name
ANSIBLE_PLAYBOOK_DIR=cwd,
))
)
if env_config and env_config.env_vars:
env.update(env_config.env_vars)
@ -807,13 +807,13 @@ def integration_environment(
)
if args.debug_strategy:
env.update(dict(ANSIBLE_STRATEGY='debug'))
env.update(ANSIBLE_STRATEGY='debug')
if 'non_local/' in target.aliases:
if args.coverage:
display.warning('Skipping coverage reporting on Ansible modules for non-local test: %s' % target.name)
env.update(dict(ANSIBLE_TEST_REMOTE_INTERPRETER=''))
env.update(ANSIBLE_TEST_REMOTE_INTERPRETER='')
env.update(integration)
@ -822,6 +822,7 @@ def integration_environment(
class IntegrationEnvironment:
"""Details about the integration environment."""
def __init__(self, test_dir: str, integration_dir: str, targets_dir: str, inventory_path: str, ansible_config: str, vars_file: str) -> None:
self.test_dir = test_dir
self.integration_dir = integration_dir
@ -833,6 +834,7 @@ class IntegrationEnvironment:
class IntegrationCache(CommonCache):
"""Integration cache."""
@property
def integration_targets(self) -> list[IntegrationTarget]:
"""The list of integration test targets."""
@ -900,9 +902,10 @@ If necessary, context can be controlled by adding entries to the "aliases" file
return exclude
def command_integration_filter(args: TIntegrationConfig,
targets: c.Iterable[TIntegrationTarget],
) -> tuple[HostState, tuple[TIntegrationTarget, ...]]:
def command_integration_filter(
args: TIntegrationConfig,
targets: c.Iterable[TIntegrationTarget],
) -> tuple[HostState, tuple[TIntegrationTarget, ...]]:
"""Filter the given integration test targets."""
targets = tuple(target for target in targets if 'hidden/' not in target.aliases)
changes = get_changes_filter(args)
@ -940,6 +943,7 @@ def command_integration_filter(args: TIntegrationConfig,
vars_file_src = os.path.join(data_context().content.root, data_context().content.integration_vars_path)
if os.path.exists(vars_file_src):
def integration_config_callback(payload_config: PayloadConfig) -> None:
"""
Add the integration config vars file to the payload file list.

@ -181,6 +181,7 @@ def cloud_init(args: IntegrationConfig, targets: tuple[IntegrationTarget, ...])
class CloudBase(metaclass=abc.ABCMeta):
"""Base class for cloud plugins."""
_CONFIG_PATH = 'config_path'
_RESOURCE_PREFIX = 'resource_prefix'
_MANAGED = 'managed'
@ -259,6 +260,7 @@ class CloudBase(metaclass=abc.ABCMeta):
class CloudProvider(CloudBase):
"""Base class for cloud provider plugins. Sets up cloud resources before delegation."""
def __init__(self, args: IntegrationConfig, config_extension: str = '.ini') -> None:
super().__init__(args)
@ -358,6 +360,7 @@ class CloudProvider(CloudBase):
class CloudEnvironment(CloudBase):
"""Base class for cloud environment plugins. Updates integration test environment after delegation."""
def setup_once(self) -> None:
"""Run setup if it has not already been run."""
if self.setup_executed:
@ -379,12 +382,14 @@ class CloudEnvironment(CloudBase):
class CloudEnvironmentConfig:
"""Configuration for the environment."""
def __init__(self,
env_vars: t.Optional[dict[str, str]] = None,
ansible_vars: t.Optional[dict[str, t.Any]] = None,
module_defaults: t.Optional[dict[str, dict[str, t.Any]]] = None,
callback_plugins: t.Optional[list[str]] = None,
):
def __init__(
self,
env_vars: t.Optional[dict[str, str]] = None,
ansible_vars: t.Optional[dict[str, t.Any]] = None,
module_defaults: t.Optional[dict[str, dict[str, t.Any]]] = None,
callback_plugins: t.Optional[list[str]] = None,
):
self.env_vars = env_vars
self.ansible_vars = ansible_vars
self.module_defaults = module_defaults

@ -21,6 +21,7 @@ from . import (
class ACMEProvider(CloudProvider):
"""ACME plugin. Sets up cloud resources for tests."""
DOCKER_SIMULATOR_NAME = 'acme-simulator'
def __init__(self, args: IntegrationConfig) -> None:
@ -68,6 +69,7 @@ class ACMEProvider(CloudProvider):
class ACMEEnvironment(CloudEnvironment):
"""ACME environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
ansible_vars = dict(

@ -37,6 +37,7 @@ from . import (
class AwsCloudProvider(CloudProvider):
"""AWS cloud provider plugin. Sets up cloud resources before delegation."""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -97,6 +98,7 @@ class AwsCloudProvider(CloudProvider):
class AwsCloudEnvironment(CloudEnvironment):
"""AWS cloud environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -31,6 +31,7 @@ from . import (
class AzureCloudProvider(CloudProvider):
"""Azure cloud provider plugin. Sets up cloud resources before delegation."""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -103,6 +104,7 @@ class AzureCloudProvider(CloudProvider):
class AzureCloudEnvironment(CloudEnvironment):
"""Azure cloud environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
env_vars = get_config(self.config_path)

@ -25,6 +25,7 @@ from . import (
class CloudscaleCloudProvider(CloudProvider):
"""Cloudscale cloud provider plugin. Sets up cloud resources before delegation."""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -39,6 +40,7 @@ class CloudscaleCloudProvider(CloudProvider):
class CloudscaleCloudEnvironment(CloudEnvironment):
"""Cloudscale cloud environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -35,6 +35,7 @@ from . import (
class CsCloudProvider(CloudProvider):
"""CloudStack cloud provider plugin. Sets up cloud resources before delegation."""
DOCKER_SIMULATOR_NAME = 'cloudstack-sim'
def __init__(self, args: IntegrationConfig) -> None:
@ -131,6 +132,7 @@ class CsCloudProvider(CloudProvider):
def _get_credentials(self, container_name: str) -> dict[str, t.Any]:
"""Wait for the CloudStack simulator to return credentials."""
def check(value) -> bool:
"""Return True if the given configuration is valid JSON, otherwise return False."""
# noinspection PyBroadException
@ -148,6 +150,7 @@ class CsCloudProvider(CloudProvider):
class CsCloudEnvironment(CloudEnvironment):
"""CloudStack cloud environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -20,6 +20,7 @@ from . import (
class DigitalOceanCloudProvider(CloudProvider):
"""Checks if a configuration file has been passed or fixtures are going to be used for testing"""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -34,6 +35,7 @@ class DigitalOceanCloudProvider(CloudProvider):
class DigitalOceanCloudEnvironment(CloudEnvironment):
"""Updates integration test environment after delegation. Will setup the config file as parameter."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -25,6 +25,7 @@ from . import (
class ForemanProvider(CloudProvider):
"""Foreman plugin. Sets up Foreman stub server for tests."""
DOCKER_SIMULATOR_NAME = 'foreman-stub'
# Default image to run Foreman stub from.
@ -88,6 +89,7 @@ class ForemanProvider(CloudProvider):
class ForemanEnvironment(CloudEnvironment):
"""Foreman environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
env_vars = dict(

@ -77,6 +77,7 @@ class GalaxyProvider(CloudProvider):
Galaxy plugin. Sets up pulp (ansible-galaxy) servers for tests.
The pulp source itself resides at: https://github.com/pulp/pulp-oci-images
"""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -143,6 +144,7 @@ class GalaxyProvider(CloudProvider):
class GalaxyEnvironment(CloudEnvironment):
"""Galaxy environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
pulp_user = str(self._get_cloud_config('PULP_USER'))

@ -22,6 +22,7 @@ from . import (
class GcpCloudProvider(CloudProvider):
"""GCP cloud provider plugin. Sets up cloud resources before delegation."""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -39,6 +40,7 @@ class GcpCloudProvider(CloudProvider):
class GcpCloudEnvironment(CloudEnvironment):
"""GCP cloud environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -29,6 +29,7 @@ from . import (
class HcloudCloudProvider(CloudProvider):
"""Hetzner Cloud provider plugin. Sets up cloud resources before delegation."""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -83,6 +84,7 @@ class HcloudCloudProvider(CloudProvider):
class HcloudCloudEnvironment(CloudEnvironment):
"""Hetzner Cloud cloud environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -28,6 +28,7 @@ KRB5_PASSWORD_ENV = 'KRB5_PASSWORD'
class HttptesterProvider(CloudProvider):
"""HTTP Tester provider plugin. Sets up resources before delegation."""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -82,6 +83,7 @@ class HttptesterProvider(CloudProvider):
class HttptesterEnvironment(CloudEnvironment):
"""HTTP Tester environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
return CloudEnvironmentConfig(

@ -21,6 +21,7 @@ from . import (
class NiosProvider(CloudProvider):
"""Nios plugin. Sets up NIOS mock server for tests."""
DOCKER_SIMULATOR_NAME = 'nios-simulator'
# Default image to run the nios simulator.
@ -82,6 +83,7 @@ class NiosProvider(CloudProvider):
class NiosEnvironment(CloudEnvironment):
"""NIOS environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
ansible_vars = dict(

@ -16,6 +16,7 @@ from . import (
class OpenNebulaCloudProvider(CloudProvider):
"""Checks if a configuration file has been passed or fixtures are going to be used for testing"""
def setup(self) -> None:
"""Setup the cloud resource before delegation and register a cleanup callback."""
super().setup()
@ -42,6 +43,7 @@ class OpenNebulaCloudProvider(CloudProvider):
class OpenNebulaCloudEnvironment(CloudEnvironment):
"""Updates integration test environment after delegation. Will setup the config file as parameter."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -30,6 +30,7 @@ from . import (
class OpenShiftCloudProvider(CloudProvider):
"""OpenShift cloud provider plugin. Sets up cloud resources before delegation."""
DOCKER_CONTAINER_NAME = 'openshift-origin'
def __init__(self, args: IntegrationConfig) -> None:
@ -103,6 +104,7 @@ class OpenShiftCloudProvider(CloudProvider):
class OpenShiftCloudEnvironment(CloudEnvironment):
"""OpenShift cloud environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
env_vars = dict(

@ -20,6 +20,7 @@ from . import (
class ScalewayCloudProvider(CloudProvider):
"""Checks if a configuration file has been passed or fixtures are going to be used for testing"""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -34,6 +35,7 @@ class ScalewayCloudProvider(CloudProvider):
class ScalewayCloudEnvironment(CloudEnvironment):
"""Updates integration test environment after delegation. Will setup the config file as parameter."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()
@ -41,7 +43,7 @@ class ScalewayCloudEnvironment(CloudEnvironment):
env_vars = dict(
SCW_API_KEY=parser.get('default', 'key'),
SCW_ORG=parser.get('default', 'org')
SCW_ORG=parser.get('default', 'org'),
)
display.sensitive.add(env_vars['SCW_API_KEY'])

@ -27,6 +27,7 @@ from . import (
class VcenterProvider(CloudProvider):
"""VMware vcenter/esx plugin. Sets up cloud resources for tests."""
DOCKER_SIMULATOR_NAME = 'vcenter-simulator'
def __init__(self, args: IntegrationConfig) -> None:
@ -94,6 +95,7 @@ class VcenterProvider(CloudProvider):
class VcenterEnvironment(CloudEnvironment):
"""VMware vcenter/esx environment plugin. Updates integration test environment after delegation."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
try:

@ -20,6 +20,7 @@ from . import (
class VultrCloudProvider(CloudProvider):
"""Checks if a configuration file has been passed or fixtures are going to be used for testing"""
def __init__(self, args: IntegrationConfig) -> None:
super().__init__(args)
@ -34,6 +35,7 @@ class VultrCloudProvider(CloudProvider):
class VultrCloudEnvironment(CloudEnvironment):
"""Updates integration test environment after delegation. Will setup the config file as parameter."""
def get_environment_config(self) -> CloudEnvironmentConfig:
"""Return environment configuration for use in the test environment after delegation."""
parser = configparser.ConfigParser()

@ -82,6 +82,7 @@ THostConfig = t.TypeVar('THostConfig', bound=HostConfig)
class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta):
"""Base class for configuring hosts for integration test code coverage."""
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
self.args = args
self.host_state = host_state
@ -124,6 +125,7 @@ class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta):
class PosixCoverageHandler(CoverageHandler[PosixConfig]):
"""Configure integration test code coverage for POSIX hosts."""
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
super().__init__(args, host_state, inventory_path)
@ -263,6 +265,7 @@ class PosixCoverageHandler(CoverageHandler[PosixConfig]):
class WindowsCoverageHandler(CoverageHandler[WindowsConfig]):
"""Configure integration test code coverage for Windows hosts."""
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
super().__init__(args, host_state, inventory_path)
@ -334,6 +337,7 @@ class WindowsCoverageHandler(CoverageHandler[WindowsConfig]):
class CoverageManager:
"""Manager for code coverage configuration and state."""
def __init__(self, args: IntegrationConfig, host_state: HostState, inventory_path: str) -> None:
self.args = args
self.host_state = host_state

@ -47,6 +47,7 @@ THostProfile = t.TypeVar('THostProfile', bound=HostProfile)
class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta):
"""Base class for target filters."""
def __init__(self, args: IntegrationConfig, configs: list[THostConfig], controller: bool) -> None:
self.args = args
self.configs = configs
@ -138,6 +139,7 @@ class TargetFilter(t.Generic[THostConfig], metaclass=abc.ABCMeta):
class PosixTargetFilter(TargetFilter[TPosixConfig]):
"""Target filter for POSIX hosts."""
def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
"""Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
super().filter_targets(targets, exclude)
@ -151,6 +153,7 @@ class PosixTargetFilter(TargetFilter[TPosixConfig]):
class DockerTargetFilter(PosixTargetFilter[DockerConfig]):
"""Target filter for docker hosts."""
def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
"""Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
super().filter_targets(targets, exclude)
@ -167,6 +170,7 @@ class PosixSshTargetFilter(PosixTargetFilter[PosixSshConfig]):
class RemoteTargetFilter(TargetFilter[TRemoteConfig]):
"""Target filter for remote Ansible Core CI managed hosts."""
def filter_profiles(self, profiles: list[THostProfile], target: IntegrationTarget) -> list[THostProfile]:
"""Filter the list of profiles, returning only those which are not skipped for the given target."""
profiles = super().filter_profiles(profiles, target)
@ -224,6 +228,7 @@ class NetworkInventoryTargetFilter(TargetFilter[NetworkInventoryConfig]):
class OriginTargetFilter(PosixTargetFilter[OriginConfig]):
"""Target filter for localhost."""
def filter_targets(self, targets: list[IntegrationTarget], exclude: set[str]) -> None:
"""Filter the list of targets, adding any which this host profile cannot support to the provided exclude list."""
super().filter_targets(targets, exclude)

@ -329,6 +329,7 @@ def collect_code_smell_tests() -> tuple[SanityTest, ...]:
class SanityIgnoreParser:
"""Parser for the consolidated sanity test ignore file."""
NO_CODE = '_'
def __init__(self, args: SanityConfig) -> None:
@ -530,11 +531,13 @@ class SanityIgnoreParser:
class SanityIgnoreProcessor:
"""Processor for sanity test ignores for a single run of one sanity test."""
def __init__(self,
args: SanityConfig,
test: SanityTest,
python_version: t.Optional[str],
) -> None:
def __init__(
self,
args: SanityConfig,
test: SanityTest,
python_version: t.Optional[str],
) -> None:
name = test.name
code = test.error_code
@ -622,18 +625,21 @@ class SanityIgnoreProcessor:
class SanitySuccess(TestSuccess):
"""Sanity test success."""
def __init__(self, test: str, python_version: t.Optional[str] = None) -> None:
super().__init__(COMMAND, test, python_version)
class SanitySkipped(TestSkipped):
"""Sanity test skipped."""
def __init__(self, test: str, python_version: t.Optional[str] = None) -> None:
super().__init__(COMMAND, test, python_version)
class SanityFailure(TestFailure):
"""Sanity test failure."""
def __init__(
self,
test: str,
@ -650,6 +656,7 @@ class SanityMessage(TestMessage):
class SanityTargets:
"""Sanity test target information."""
def __init__(self, targets: tuple[TestTarget, ...], include: tuple[TestTarget, ...]) -> None:
self.targets = targets
self.include = include
@ -699,6 +706,7 @@ class SanityTargets:
class SanityTest(metaclass=abc.ABCMeta):
"""Sanity test base class."""
ansible_only = False
def __init__(self, name: t.Optional[str] = None) -> None:
@ -815,6 +823,7 @@ class SanityTest(metaclass=abc.ABCMeta):
class SanitySingleVersion(SanityTest, metaclass=abc.ABCMeta):
"""Base class for sanity test plugins which should run on a single python version."""
@property
def require_libyaml(self) -> bool:
"""True if the test requires PyYAML to have libyaml support."""
@ -831,6 +840,7 @@ class SanitySingleVersion(SanityTest, metaclass=abc.ABCMeta):
class SanityCodeSmellTest(SanitySingleVersion):
"""Sanity test script."""
def __init__(self, path) -> None:
name = os.path.splitext(os.path.basename(path))[0]
config_path = os.path.splitext(path)[0] + '.json'
@ -1034,6 +1044,7 @@ class SanityCodeSmellTest(SanitySingleVersion):
class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta):
"""Base class for sanity test plugins which are idependent of the python version being used."""
@abc.abstractmethod
def test(self, args: SanityConfig, targets: SanityTargets) -> TestResult:
"""Run the sanity test and return the result."""
@ -1050,6 +1061,7 @@ class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta):
class SanityMultipleVersion(SanityTest, metaclass=abc.ABCMeta):
"""Base class for sanity test plugins which should run on multiple python versions."""
@abc.abstractmethod
def test(self, args: SanityConfig, targets: SanityTargets, python: PythonConfig) -> TestResult:
"""Run the sanity test and return the result."""

@ -50,6 +50,7 @@ from ...host_configs import (
class AnsibleDocTest(SanitySingleVersion):
"""Sanity test for ansible-doc."""
def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]:
"""Return the given list of test targets, filtered to include only those relevant for the test."""
plugin_paths = [plugin_path for plugin_type, plugin_path in data_context().content.plugin_paths.items() if plugin_type in DOCUMENTABLE_PLUGINS]

@ -38,6 +38,7 @@ from ...util import (
class BinSymlinksTest(SanityVersionNeutral):
"""Sanity test for symlinks in the bin directory."""
ansible_only = True
@property

@ -43,6 +43,7 @@ from ...host_configs import (
class CompileTest(SanityMultipleVersion):
"""Sanity test for proper python syntax."""
def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]:
"""Return the given list of test targets, filtered to include only those relevant for the test."""
return [target for target in targets if os.path.splitext(target.path)[1] == '.py' or is_subdir(target.path, 'bin')]

@ -25,6 +25,7 @@ from ...config import (
class IgnoresTest(SanityVersionNeutral):
"""Sanity test for sanity test ignore entries."""
@property
def can_ignore(self) -> bool:
"""True if the test supports ignore entries."""

@ -84,6 +84,7 @@ def _get_module_test(module_restrictions: bool) -> c.Callable[[str], bool]:
class ImportTest(SanityMultipleVersion):
"""Sanity test for proper import exception handling."""
def filter_targets(self, targets: list[TestTarget]) -> list[TestTarget]:
"""Return the given list of test targets, filtered to include only those relevant for the test."""
if data_context().content.is_ansible:

@ -61,6 +61,7 @@ from ...host_configs import (
class IntegrationAliasesTest(SanitySingleVersion):
"""Sanity test to evaluate integration test aliases."""
CI_YML = '.azure-pipelines/azure-pipelines.yml'
TEST_ALIAS_PREFIX = 'shippable' # this will be changed at some point in the future
@ -424,5 +425,6 @@ class IntegrationAliasesTest(SanitySingleVersion):
@dataclasses.dataclass
class Results:
"""Check results."""
comments: list[str]
labels: dict[str, bool]

@ -60,6 +60,7 @@ from ...host_configs import (
class MypyTest(SanityMultipleVersion):
"""Sanity test which executes mypy."""
ansible_only = True
vendored_paths = (
@ -232,7 +233,7 @@ class MypyTest(SanityMultipleVersion):
# Below are context specific arguments.
# They are primarily useful for listing individual 'ignore_missing_imports' entries instead of using a global ignore.
'--config-file', config_path,
]
] # fmt: skip
cmd.extend(context_paths)
@ -265,6 +266,7 @@ class MypyTest(SanityMultipleVersion):
@dataclasses.dataclass(frozen=True)
class MyPyContext:
"""Context details for a single run of mypy."""
name: str
paths: list[str]
python_versions: tuple[str, ...]

@ -43,6 +43,7 @@ from ...host_configs import (
class Pep8Test(SanitySingleVersion):
"""Sanity test for PEP 8 style guidelines using pycodestyle."""
@property
def error_code(self) -> t.Optional[str]:
"""Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes."""
@ -66,7 +67,7 @@ class Pep8Test(SanitySingleVersion):
'--max-line-length', '160',
'--config', '/dev/null',
'--ignore', ','.join(sorted(current_ignore)),
] + paths
] + paths # fmt: skip
if paths:
try:

@ -45,6 +45,7 @@ from ...data import (
class PslintTest(SanityVersionNeutral):
"""Sanity test using PSScriptAnalyzer."""
@property
def error_code(self) -> t.Optional[str]:
"""Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes."""

@ -58,6 +58,7 @@ from ...host_configs import (
class PylintTest(SanitySingleVersion):
"""Sanity test using pylint."""
def __init__(self) -> None:
super().__init__()
self.optional_error_codes.update([
@ -106,6 +107,7 @@ class PylintTest(SanitySingleVersion):
def filter_path(path_filter: str = None) -> c.Callable[[str], bool]:
"""Return a function that filters out paths which are not a subdirectory of the given path."""
def context_filter(path_to_filter: str) -> bool:
"""Return true if the given path matches, otherwise return False."""
return is_subdir(path_to_filter, path_filter)
@ -227,7 +229,7 @@ class PylintTest(SanitySingleVersion):
'--rcfile', rcfile,
'--output-format', 'json',
'--load-plugins', ','.join(sorted(load_plugins)),
] + paths
] + paths # fmt: skip
if data_context().content.collection:
cmd.extend(['--collection-name', data_context().content.collection.full_name])

@ -27,6 +27,7 @@ from ...data import (
class SanityDocsTest(SanityVersionNeutral):
"""Sanity test for documentation of sanity tests."""
ansible_only = True
@property

@ -44,6 +44,7 @@ from ...config import (
class ShellcheckTest(SanityVersionNeutral):
"""Sanity test using shellcheck."""
@property
def error_code(self) -> t.Optional[str]:
"""Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes."""
@ -68,7 +69,7 @@ class ShellcheckTest(SanityVersionNeutral):
'shellcheck',
'-e', ','.join(sorted(exclude)),
'--format', 'checkstyle',
] + paths
] + paths # fmt: skip
try:
stdout, stderr = run_command(args, cmd, capture=True)

@ -120,7 +120,7 @@ class ValidateModulesTest(SanitySingleVersion):
os.path.join(SANITY_ROOT, 'validate-modules', 'validate.py'),
'--format', 'json',
'--arg-spec',
]
] # fmt: skip
if data_context().content.collection:
cmd.extend(['--collection', data_context().content.collection.directory])

@ -47,6 +47,7 @@ from ...host_configs import (
class YamllintTest(SanitySingleVersion):
"""Sanity test using yamllint."""
@property
def error_code(self) -> t.Optional[str]:
"""Error code for ansible-test matching the format used by the underlying test program, or None if the program does not use error codes."""

@ -124,9 +124,11 @@ def command_shell(args: ShellConfig) -> None:
# 255 indicates SSH itself failed, rather than a command run on the remote host.
# In this case, report a host connection error so additional troubleshooting output is provided.
if not args.delegate and not args.host_path:
def callback() -> None:
"""Callback to run during error display."""
target_profile.on_target_failure() # when the controller is not delegated, report failures immediately
else:
callback = None

@ -88,6 +88,7 @@ from ...host_profiles import (
class TestContext:
"""Contexts that unit tests run in based on the type of content."""
controller = 'controller'
modules = 'modules'
module_utils = 'module_utils'
@ -255,14 +256,13 @@ def command_units(args: UnitsConfig) -> None:
'--forked',
'-r', 'a',
'-n', str(args.num_workers) if args.num_workers else 'auto',
'--color',
'yes' if args.color else 'no',
'--color', 'yes' if args.color else 'no',
'-p', 'no:cacheprovider',
'-c', os.path.join(ANSIBLE_TEST_DATA_ROOT, 'pytest', 'config', config_name),
'--junit-xml', os.path.join(ResultType.JUNIT.path, 'python%s-%s-units.xml' % (python.version, test_context)),
'--strict-markers', # added in pytest 4.5.0
'--rootdir', data_context().content.root,
]
] # fmt:skip
if not data_context().content.collection:
cmd.append('--durations=25')

@ -9,6 +9,7 @@ from functools import (
try:
import yaml as _yaml
YAML_IMPORT_ERROR = None
except ImportError as ex:
yaml_load = None # pylint: disable=invalid-name

@ -29,6 +29,7 @@ from .become import (
class CGroupVersion(enum.Enum):
"""The control group version(s) required by a container."""
NONE = 'none'
V1_ONLY = 'v1-only'
V2_ONLY = 'v2-only'
@ -40,6 +41,7 @@ class CGroupVersion(enum.Enum):
class AuditMode(enum.Enum):
"""The audit requirements of a container."""
NONE = 'none'
REQUIRED = 'required'
@ -50,6 +52,7 @@ class AuditMode(enum.Enum):
@dataclasses.dataclass(frozen=True)
class CompletionConfig(metaclass=abc.ABCMeta):
"""Base class for completion configuration."""
name: str
@property
@ -61,6 +64,7 @@ class CompletionConfig(metaclass=abc.ABCMeta):
@dataclasses.dataclass(frozen=True)
class PosixCompletionConfig(CompletionConfig, metaclass=abc.ABCMeta):
"""Base class for completion configuration of POSIX environments."""
@property
@abc.abstractmethod
def supported_pythons(self) -> list[str]:
@ -85,6 +89,7 @@ class PosixCompletionConfig(CompletionConfig, metaclass=abc.ABCMeta):
@dataclasses.dataclass(frozen=True)
class PythonCompletionConfig(PosixCompletionConfig, metaclass=abc.ABCMeta):
"""Base class for completion configuration of Python environments."""
python: str = ''
python_dir: str = '/usr/bin'
@ -103,6 +108,7 @@ class PythonCompletionConfig(PosixCompletionConfig, metaclass=abc.ABCMeta):
@dataclasses.dataclass(frozen=True)
class RemoteCompletionConfig(CompletionConfig):
"""Base class for completion configuration of remote environments provisioned through Ansible Core CI."""
provider: t.Optional[str] = None
arch: t.Optional[str] = None
@ -132,6 +138,7 @@ class RemoteCompletionConfig(CompletionConfig):
@dataclasses.dataclass(frozen=True)
class InventoryCompletionConfig(CompletionConfig):
"""Configuration for inventory files."""
def __init__(self) -> None:
super().__init__(name='inventory')
@ -144,6 +151,7 @@ class InventoryCompletionConfig(CompletionConfig):
@dataclasses.dataclass(frozen=True)
class PosixSshCompletionConfig(PythonCompletionConfig):
"""Configuration for a POSIX host reachable over SSH."""
def __init__(self, user: str, host: str) -> None:
super().__init__(
name=f'{user}@{host}',
@ -159,6 +167,7 @@ class PosixSshCompletionConfig(PythonCompletionConfig):
@dataclasses.dataclass(frozen=True)
class DockerCompletionConfig(PythonCompletionConfig):
"""Configuration for Docker containers."""
image: str = ''
seccomp: str = 'default'
cgroup: str = CGroupVersion.V1_V2.value
@ -201,6 +210,7 @@ class DockerCompletionConfig(PythonCompletionConfig):
@dataclasses.dataclass(frozen=True)
class NetworkRemoteCompletionConfig(RemoteCompletionConfig):
"""Configuration for remote network platforms."""
collection: str = ''
connection: str = ''
placeholder: bool = False
@ -213,6 +223,7 @@ class NetworkRemoteCompletionConfig(RemoteCompletionConfig):
@dataclasses.dataclass(frozen=True)
class PosixRemoteCompletionConfig(RemoteCompletionConfig, PythonCompletionConfig):
"""Configuration for remote POSIX platforms."""
become: t.Optional[str] = None
placeholder: bool = False

@ -42,6 +42,7 @@ THostConfig = t.TypeVar('THostConfig', bound=HostConfig)
class TerminateMode(enum.Enum):
"""When to terminate instances."""
ALWAYS = enum.auto()
NEVER = enum.auto()
SUCCESS = enum.auto()
@ -53,6 +54,7 @@ class TerminateMode(enum.Enum):
@dataclasses.dataclass(frozen=True)
class ModulesConfig:
"""Configuration for modules."""
python_requires: str
python_versions: tuple[str, ...]
controller_only: bool
@ -61,6 +63,7 @@ class ModulesConfig:
@dataclasses.dataclass(frozen=True)
class ContentConfig:
"""Configuration for all content."""
modules: ModulesConfig
python_versions: tuple[str, ...]
py2_support: bool
@ -68,6 +71,7 @@ class ContentConfig:
class EnvironmentConfig(CommonConfig):
"""Configuration common to all commands which execute in an environment."""
def __init__(self, args: t.Any, command: str) -> None:
super().__init__(args, command)
@ -199,6 +203,7 @@ class EnvironmentConfig(CommonConfig):
class TestConfig(EnvironmentConfig):
"""Configuration common to all test commands."""
def __init__(self, args: t.Any, command: str) -> None:
super().__init__(args, command)
@ -241,6 +246,7 @@ class TestConfig(EnvironmentConfig):
class ShellConfig(EnvironmentConfig):
"""Configuration for the shell command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'shell')
@ -254,6 +260,7 @@ class ShellConfig(EnvironmentConfig):
class SanityConfig(TestConfig):
"""Configuration for the sanity command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'sanity')
@ -268,6 +275,7 @@ class SanityConfig(TestConfig):
self.display_stderr = self.lint or self.list_tests
if self.keep_git:
def git_callback(payload_config: PayloadConfig) -> None:
"""Add files from the content root .git directory to the payload file list."""
files = payload_config.files
@ -281,6 +289,7 @@ class SanityConfig(TestConfig):
class IntegrationConfig(TestConfig):
"""Configuration for the integration command."""
def __init__(self, args: t.Any, command: str) -> None:
super().__init__(args, command)
@ -325,18 +334,21 @@ TIntegrationConfig = t.TypeVar('TIntegrationConfig', bound=IntegrationConfig)
class PosixIntegrationConfig(IntegrationConfig):
"""Configuration for the posix integration command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'integration')
class WindowsIntegrationConfig(IntegrationConfig):
"""Configuration for the windows integration command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'windows-integration')
class NetworkIntegrationConfig(IntegrationConfig):
"""Configuration for the network integration command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'network-integration')
@ -345,6 +357,7 @@ class NetworkIntegrationConfig(IntegrationConfig):
class UnitsConfig(TestConfig):
"""Configuration for the units command."""
def __init__(self, args: t.Any) -> None:
super().__init__(args, 'units')

@ -44,33 +44,37 @@ from .become import (
class Connection(metaclass=abc.ABCMeta):
"""Base class for connecting to a host."""
@abc.abstractmethod
def run(self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
def run(
self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
def extract_archive(self,
chdir: str,
src: t.IO[bytes],
):
def extract_archive(
self,
chdir: str,
src: t.IO[bytes],
):
"""Extract the given archive file stream in the specified directory."""
tar_cmd = ['tar', 'oxzf', '-', '-C', chdir]
retry(lambda: self.run(tar_cmd, stdin=src, capture=True))
def create_archive(self,
chdir: str,
name: str,
dst: t.IO[bytes],
exclude: t.Optional[str] = None,
):
def create_archive(
self,
chdir: str,
name: str,
dst: t.IO[bytes],
exclude: t.Optional[str] = None,
):
"""Create the specified archive file stream from the specified directory, including the given name and optionally excluding the given name."""
tar_cmd = ['tar', 'cf', '-', '-C', chdir]
gzip_cmd = ['gzip']
@ -90,18 +94,20 @@ class Connection(metaclass=abc.ABCMeta):
class LocalConnection(Connection):
"""Connect to localhost."""
def __init__(self, args: EnvironmentConfig) -> None:
self.args = args
def run(self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
def run(
self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
return run_command(
args=self.args,
@ -117,6 +123,7 @@ class LocalConnection(Connection):
class SshConnection(Connection):
"""Connect to a host using SSH."""
def __init__(self, args: EnvironmentConfig, settings: SshConnectionDetail, become: t.Optional[Become] = None) -> None:
self.args = args
self.settings = settings
@ -136,15 +143,16 @@ class SshConnection(Connection):
self.options.extend(ssh_options_to_list(ssh_options))
def run(self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
def run(
self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
options = list(self.options)
@ -213,20 +221,22 @@ class SshConnection(Connection):
class DockerConnection(Connection):
"""Connect to a host using Docker."""
def __init__(self, args: EnvironmentConfig, container_id: str, user: t.Optional[str] = None) -> None:
self.args = args
self.container_id = container_id
self.user: t.Optional[str] = user
def run(self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
def run(
self,
command: list[str],
capture: bool,
interactive: bool = False,
data: t.Optional[str] = None,
stdin: t.Optional[t.IO[bytes]] = None,
stdout: t.Optional[t.IO[bytes]] = None,
output_stream: t.Optional[OutputStream] = None,
) -> tuple[t.Optional[str], t.Optional[str]]:
"""Run the specified command and return the result."""
options = []

@ -95,6 +95,7 @@ support_containers_mutex = threading.Lock()
class HostType:
"""Enum representing the types of hosts involved in running tests."""
origin = 'origin'
control = 'control'
managed = 'managed'
@ -102,6 +103,7 @@ class HostType:
class CleanupMode(enum.Enum):
"""How container cleanup should be handled."""
YES = enum.auto()
NO = enum.auto()
INFO = enum.auto()
@ -146,7 +148,7 @@ def run_support_container(
if current_container_id:
publish_ports = False # publishing ports is pointless if already running in a docker container
options = (options or [])
options = options or []
if start:
options.append('-dt') # the -t option is required to cause systemd in the container to log output to the console
@ -378,6 +380,7 @@ def get_container_database(args: EnvironmentConfig) -> ContainerDatabase:
class ContainerAccess:
"""Information needed for one test host to access a single container supporting tests."""
def __init__(self, host_ip: str, names: list[str], ports: t.Optional[list[int]], forwards: t.Optional[dict[int, int]]) -> None:
# if forwards is set
# this is where forwards are sent (it is the host that provides an indirect connection to the containers on alternate ports)
@ -437,6 +440,7 @@ class ContainerAccess:
class ContainerDatabase:
"""Database of running containers used to support tests."""
def __init__(self, data: dict[str, dict[str, dict[str, ContainerAccess]]]) -> None:
self.data = data
@ -576,6 +580,7 @@ def create_container_database(args: EnvironmentConfig) -> ContainerDatabase:
class SupportContainerContext:
"""Context object for tracking information relating to access of support containers."""
def __init__(self, containers: ContainerDatabase, process: t.Optional[SshProcess]) -> None:
self.containers = containers
self.process = process
@ -678,19 +683,21 @@ def create_support_container_context(
class ContainerDescriptor:
"""Information about a support container."""
def __init__(self,
image: str,
context: str,
name: str,
container_id: str,
ports: list[int],
aliases: list[str],
publish_ports: bool,
running: bool,
existing: bool,
cleanup: CleanupMode,
env: t.Optional[dict[str, str]],
) -> None:
def __init__(
self,
image: str,
context: str,
name: str,
container_id: str,
ports: list[int],
aliases: list[str],
publish_ports: bool,
running: bool,
existing: bool,
cleanup: CleanupMode,
env: t.Optional[dict[str, str]],
) -> None:
self.image = image
self.context = context
self.name = name
@ -757,23 +764,26 @@ class ContainerDescriptor:
class SupportContainer:
"""Information about a running support container available for use by tests."""
def __init__(self,
container: DockerInspect,
container_ip: str,
published_ports: dict[int, int],
) -> None:
def __init__(
self,
container: DockerInspect,
container_ip: str,
published_ports: dict[int, int],
) -> None:
self.container = container
self.container_ip = container_ip
self.published_ports = published_ports
def wait_for_file(args: EnvironmentConfig,
container_name: str,
path: str,
sleep: int,
tries: int,
check: t.Optional[c.Callable[[str], bool]] = None,
) -> str:
def wait_for_file(
args: EnvironmentConfig,
container_name: str,
path: str,
sleep: int,
tries: int,
check: t.Optional[c.Callable[[str], bool]] = None,
) -> str:
"""Wait for the specified file to become available in the requested container and return its contents."""
display.info('Waiting for container "%s" to provide file: %s' % (container_name, path))
@ -853,6 +863,7 @@ def create_container_hooks(
"""Clean up previously configured SSH port forwarding which was required by the specified target."""
cleanup_ssh_ports(args, control_connections, '%s_hosts_restore.yml' % control_type, control_state, target, HostType.control)
cleanup_ssh_ports(args, managed_connections, '%s_hosts_restore.yml' % managed_type, managed_state, target, HostType.managed)
else:
pre_target, post_target = None, None

@ -54,6 +54,7 @@ from .data import (
@dataclasses.dataclass(frozen=True)
class Resource(metaclass=abc.ABCMeta):
"""Base class for Ansible Core CI resources."""
@abc.abstractmethod
def as_tuple(self) -> tuple[str, str, str, str]:
"""Return the resource as a tuple of platform, version, architecture and provider."""
@ -71,6 +72,7 @@ class Resource(metaclass=abc.ABCMeta):
@dataclasses.dataclass(frozen=True)
class VmResource(Resource):
"""Details needed to request a VM from Ansible Core CI."""
platform: str
version: str
architecture: str
@ -94,6 +96,7 @@ class VmResource(Resource):
@dataclasses.dataclass(frozen=True)
class CloudResource(Resource):
"""Details needed to request cloud credentials from Ansible Core CI."""
platform: str
def as_tuple(self) -> tuple[str, str, str, str]:
@ -112,6 +115,7 @@ class CloudResource(Resource):
class AnsibleCoreCI:
"""Client for Ansible Core CI services."""
DEFAULT_ENDPOINT = 'https://ansible-core-ci.testing.ansible.com'
def __init__(
@ -303,7 +307,7 @@ class AnsibleCoreCI:
)
)
data.update(dict(auth=auth))
data.update(auth=auth)
headers = {
'Content-Type': 'application/json',
@ -420,6 +424,7 @@ class AnsibleCoreCI:
class CoreHttpError(HttpError):
"""HTTP response as an error."""
def __init__(self, status: int, remote_message: str, remote_stack_trace: str) -> None:
super().__init__(status, f'{remote_message}{remote_stack_trace}')
@ -429,6 +434,7 @@ class CoreHttpError(HttpError):
class SshKey:
"""Container for SSH key used to connect to remote instances."""
KEY_TYPE = 'rsa' # RSA is used to maintain compatibility with paramiko and EC2
KEY_NAME = f'id_{KEY_TYPE}'
PUB_NAME = f'{KEY_NAME}.pub'
@ -532,14 +538,16 @@ class SshKey:
class InstanceConnection:
"""Container for remote instance status and connection details."""
def __init__(self,
running: bool,
hostname: t.Optional[str] = None,
port: t.Optional[int] = None,
username: t.Optional[str] = None,
password: t.Optional[str] = None,
response_json: t.Optional[dict[str, t.Any]] = None,
) -> None:
def __init__(
self,
running: bool,
hostname: t.Optional[str] = None,
port: t.Optional[int] = None,
username: t.Optional[str] = None,
password: t.Optional[str] = None,
response_json: t.Optional[dict[str, t.Any]] = None,
) -> None:
self.running = running
self.hostname = hostname
self.port = port

@ -60,6 +60,7 @@ from .thread import (
@dataclasses.dataclass(frozen=True)
class CoverageVersion:
"""Details about a coverage version and its supported Python versions."""
coverage_version: str
schema_version: int
min_python: tuple[int, int]
@ -81,6 +82,7 @@ CONTROLLER_COVERAGE_VERSION = COVERAGE_VERSIONS[0]
class CoverageError(ApplicationError):
"""Exception caused while attempting to read a coverage file."""
def __init__(self, path: str, message: str) -> None:
self.path = path
self.message = message

@ -53,12 +53,14 @@ from .provider.layout.unsupported import (
@dataclasses.dataclass(frozen=True)
class PayloadConfig:
"""Configuration required to build a source tree payload for delegation."""
files: list[tuple[str, str]]
permissions: dict[str, int]
class DataContext:
"""Data context providing details about the current execution environment for ansible-test."""
def __init__(self) -> None:
content_path = os.environ.get('ANSIBLE_TEST_CONTENT_ROOT')
current_path = os.getcwd()
@ -120,11 +122,12 @@ class DataContext:
return collections
@staticmethod
def __create_content_layout(layout_providers: list[t.Type[LayoutProvider]],
source_providers: list[t.Type[SourceProvider]],
root: str,
walk: bool,
) -> ContentLayout:
def __create_content_layout(
layout_providers: list[t.Type[LayoutProvider]],
source_providers: list[t.Type[SourceProvider]],
root: str,
walk: bool,
) -> ContentLayout:
"""Create a content layout using the given providers and root path."""
try:
layout_provider = find_path_provider(LayoutProvider, layout_providers, root, walk)
@ -246,6 +249,7 @@ def data_context() -> DataContext:
@dataclasses.dataclass(frozen=True)
class PluginInfo:
"""Information about an Ansible plugin."""
plugin_type: str
name: str
paths: list[str]

@ -45,6 +45,7 @@ from ..cgroup import (
class CGroupState(enum.Enum):
"""The expected state of a cgroup related mount point."""
HOST = enum.auto()
PRIVATE = enum.auto()
SHADOWED = enum.auto()
@ -53,6 +54,7 @@ class CGroupState(enum.Enum):
@dataclasses.dataclass(frozen=True)
class CGroupMount:
"""Details on a cgroup mount point that is expected to be present in the container."""
path: str
type: t.Optional[str]
writable: t.Optional[bool]

@ -18,6 +18,7 @@ def parse_diff(lines: list[str]) -> list[FileDiff]:
class FileDiff:
"""Parsed diff for a single file."""
def __init__(self, old_path: str, new_path: str) -> None:
self.old = DiffSide(old_path, new=False)
self.new = DiffSide(new_path, new=True)
@ -36,6 +37,7 @@ class FileDiff:
class DiffSide:
"""Parsed diff for a single 'side' of a single file."""
def __init__(self, path: str, new: bool) -> None:
self.path = path
self.new = new
@ -109,6 +111,7 @@ class DiffSide:
class DiffParser:
"""Parse diff lines."""
def __init__(self, lines: list[str]) -> None:
self.lines = lines
self.files: list[FileDiff] = []

@ -243,6 +243,7 @@ def get_docker_info(args: CommonConfig) -> DockerInfo:
class SystemdControlGroupV1Status(enum.Enum):
"""The state of the cgroup v1 systemd hierarchy on the container host."""
SUBSYSTEM_MISSING = 'The systemd cgroup subsystem was not found.'
FILESYSTEM_NOT_MOUNTED = 'The "/sys/fs/cgroup/systemd" filesystem is not mounted.'
MOUNT_TYPE_NOT_CORRECT = 'The "/sys/fs/cgroup/systemd" mount type is not correct.'
@ -252,6 +253,7 @@ class SystemdControlGroupV1Status(enum.Enum):
@dataclasses.dataclass(frozen=True)
class ContainerHostProperties:
"""Container host properties detected at run time."""
audit_code: str
max_open_files: int
loginuid: t.Optional[int]
@ -411,7 +413,7 @@ def run_utility_container(
options = options + [
'--name', name,
'--rm',
]
] # fmt: skip
if data:
options.append('-i')
@ -423,6 +425,7 @@ def run_utility_container(
class DockerCommand:
"""Details about the available docker command."""
def __init__(self, command: str, executable: str, version: str) -> None:
self.command = command
self.executable = executable
@ -720,6 +723,7 @@ class DockerError(Exception):
class ContainerNotFoundError(DockerError):
"""The container identified by `identifier` was not found."""
def __init__(self, identifier: str) -> None:
super().__init__('The container "%s" was not found.' % identifier)
@ -728,6 +732,7 @@ class ContainerNotFoundError(DockerError):
class DockerInspect:
"""The results of `docker inspect` for a single container."""
def __init__(self, args: CommonConfig, inspection: dict[str, t.Any]) -> None:
self.args = args
self.inspection = inspection
@ -847,6 +852,7 @@ def docker_network_disconnect(args: CommonConfig, container_id: str, network: st
class DockerImageInspect:
"""The results of `docker image inspect` for a single image."""
def __init__(self, args: CommonConfig, inspection: dict[str, t.Any]) -> None:
self.args = args
self.inspection = inspection
@ -909,6 +915,7 @@ def docker_image_inspect(args: CommonConfig, image: str, always: bool = False) -
class DockerNetworkInspect:
"""The results of `docker network inspect` for a single network."""
def __init__(self, args: CommonConfig, inspection: dict[str, t.Any]) -> None:
self.args = args
self.inspection = inspection
@ -961,8 +968,16 @@ def docker_exec(
if data or stdin or stdout:
options.append('-i')
return docker_command(args, ['exec'] + options + [container_id] + cmd, capture=capture, stdin=stdin, stdout=stdout, interactive=interactive,
output_stream=output_stream, data=data)
return docker_command(
args,
['exec'] + options + [container_id] + cmd,
capture=capture,
stdin=stdin,
stdout=stdout,
interactive=interactive,
output_stream=output_stream,
data=data,
)
def docker_command(
@ -983,8 +998,18 @@ def docker_command(
if command[0] == 'podman' and get_podman_remote():
command.append('--remote')
return run_command(args, command + cmd, env=env, capture=capture, stdin=stdin, stdout=stdout, interactive=interactive, always=always,
output_stream=output_stream, data=data)
return run_command(
args,
command + cmd,
env=env,
capture=capture,
stdin=stdin,
stdout=stdout,
interactive=interactive,
always=always,
output_stream=output_stream,
data=data,
)
def docker_environment() -> dict[str, str]:

@ -81,18 +81,21 @@ def detect_changes(args: TestConfig) -> t.Optional[list[str]]:
class NoChangesDetected(ApplicationWarning):
"""Exception when change detection was performed, but no changes were found."""
def __init__(self) -> None:
super().__init__('No changes detected.')
class NoTestsForChanges(ApplicationWarning):
"""Exception when changes detected, but no tests trigger as a result."""
def __init__(self) -> None:
super().__init__('No tests found for detected changes.')
class Delegate(Exception):
"""Trigger command delegation."""
def __init__(self, host_state: HostState, exclude: list[str] = None, require: list[str] = None) -> None:
super().__init__()
@ -103,6 +106,7 @@ class Delegate(Exception):
class ListTargets(Exception):
"""List integration test targets instead of executing them."""
def __init__(self, target_names: list[str]) -> None:
super().__init__()
@ -111,5 +115,6 @@ class ListTargets(Exception):
class AllTargetsSkipped(ApplicationWarning):
"""All targets skipped."""
def __init__(self) -> None:
super().__init__('All targets skipped.')

@ -12,6 +12,7 @@ from .util import (
class Git:
"""Wrapper around git command-line tools."""
def __init__(self, root: t.Optional[str] = None) -> None:
self.git = 'git'
self.root = root

@ -48,6 +48,7 @@ from .util import (
@dataclasses.dataclass(frozen=True)
class OriginCompletionConfig(PosixCompletionConfig):
"""Pseudo completion config for the origin."""
def __init__(self) -> None:
super().__init__(name='origin')
@ -73,6 +74,7 @@ class OriginCompletionConfig(PosixCompletionConfig):
@dataclasses.dataclass(frozen=True)
class HostContext:
"""Context used when getting and applying defaults for host configurations."""
controller_config: t.Optional['PosixConfig']
@property
@ -84,6 +86,7 @@ class HostContext:
@dataclasses.dataclass
class HostConfig(metaclass=abc.ABCMeta):
"""Base class for host configuration."""
@abc.abstractmethod
def get_defaults(self, context: HostContext) -> CompletionConfig:
"""Return the default settings."""
@ -104,6 +107,7 @@ class HostConfig(metaclass=abc.ABCMeta):
@dataclasses.dataclass
class PythonConfig(metaclass=abc.ABCMeta):
"""Configuration for Python."""
version: t.Optional[str] = None
path: t.Optional[str] = None
@ -142,6 +146,7 @@ class PythonConfig(metaclass=abc.ABCMeta):
@dataclasses.dataclass
class NativePythonConfig(PythonConfig):
"""Configuration for native Python."""
@property
def is_managed(self) -> bool:
"""
@ -154,6 +159,7 @@ class NativePythonConfig(PythonConfig):
@dataclasses.dataclass
class VirtualPythonConfig(PythonConfig):
"""Configuration for Python in a virtual environment."""
system_site_packages: t.Optional[bool] = None
def apply_defaults(self, context: HostContext, defaults: PosixCompletionConfig) -> None:
@ -175,6 +181,7 @@ class VirtualPythonConfig(PythonConfig):
@dataclasses.dataclass
class PosixConfig(HostConfig, metaclass=abc.ABCMeta):
"""Base class for POSIX host configuration."""
python: t.Optional[PythonConfig] = None
@property
@ -199,6 +206,7 @@ class PosixConfig(HostConfig, metaclass=abc.ABCMeta):
@dataclasses.dataclass
class ControllerHostConfig(PosixConfig, metaclass=abc.ABCMeta):
"""Base class for host configurations which support the controller."""
@abc.abstractmethod
def get_default_targets(self, context: HostContext) -> list[ControllerConfig]:
"""Return the default targets for this host config."""
@ -207,6 +215,7 @@ class ControllerHostConfig(PosixConfig, metaclass=abc.ABCMeta):
@dataclasses.dataclass
class RemoteConfig(HostConfig, metaclass=abc.ABCMeta):
"""Base class for remote host configuration."""
name: t.Optional[str] = None
provider: t.Optional[str] = None
arch: t.Optional[str] = None
@ -245,6 +254,7 @@ class RemoteConfig(HostConfig, metaclass=abc.ABCMeta):
@dataclasses.dataclass
class PosixSshConfig(PosixConfig):
"""Configuration for a POSIX SSH host."""
user: t.Optional[str] = None
host: t.Optional[str] = None
port: t.Optional[int] = None
@ -265,6 +275,7 @@ class PosixSshConfig(PosixConfig):
@dataclasses.dataclass
class InventoryConfig(HostConfig):
"""Configuration using inventory."""
path: t.Optional[str] = None
def get_defaults(self, context: HostContext) -> InventoryCompletionConfig:
@ -279,6 +290,7 @@ class InventoryConfig(HostConfig):
@dataclasses.dataclass
class DockerConfig(ControllerHostConfig, PosixConfig):
"""Configuration for a docker host."""
name: t.Optional[str] = None
image: t.Optional[str] = None
memory: t.Optional[int] = None
@ -343,6 +355,7 @@ class DockerConfig(ControllerHostConfig, PosixConfig):
@dataclasses.dataclass
class PosixRemoteConfig(RemoteConfig, ControllerHostConfig, PosixConfig):
"""Configuration for a POSIX remote host."""
become: t.Optional[str] = None
def get_defaults(self, context: HostContext) -> PosixRemoteCompletionConfig:
@ -385,6 +398,7 @@ class WindowsConfig(HostConfig, metaclass=abc.ABCMeta):
@dataclasses.dataclass
class WindowsRemoteConfig(RemoteConfig, WindowsConfig):
"""Configuration for a remote Windows host."""
def get_defaults(self, context: HostContext) -> WindowsRemoteCompletionConfig:
"""Return the default settings."""
return filter_completion(windows_completion()).get(self.name) or windows_completion().get(self.platform)
@ -403,6 +417,7 @@ class NetworkConfig(HostConfig, metaclass=abc.ABCMeta):
@dataclasses.dataclass
class NetworkRemoteConfig(RemoteConfig, NetworkConfig):
"""Configuration for a remote network host."""
collection: t.Optional[str] = None
connection: t.Optional[str] = None
@ -431,6 +446,7 @@ class NetworkInventoryConfig(InventoryConfig, NetworkConfig):
@dataclasses.dataclass
class OriginConfig(ControllerHostConfig, PosixConfig):
"""Configuration for the origin host."""
def get_defaults(self, context: HostContext) -> OriginCompletionConfig:
"""Return the default settings."""
return OriginCompletionConfig()
@ -448,6 +464,7 @@ class OriginConfig(ControllerHostConfig, PosixConfig):
@dataclasses.dataclass
class ControllerConfig(PosixConfig):
"""Configuration for the controller host."""
controller: t.Optional[PosixConfig] = None
def get_defaults(self, context: HostContext) -> PosixCompletionConfig:
@ -482,6 +499,7 @@ class ControllerConfig(PosixConfig):
class FallbackReason(enum.Enum):
"""Reason fallback was performed."""
ENVIRONMENT = enum.auto()
PYTHON = enum.auto()
@ -489,6 +507,7 @@ class FallbackReason(enum.Enum):
@dataclasses.dataclass(frozen=True)
class FallbackDetail:
"""Details about controller fallback behavior."""
reason: FallbackReason
message: str
@ -496,6 +515,7 @@ class FallbackDetail:
@dataclasses.dataclass(frozen=True)
class HostSettings:
"""Host settings for the controller and targets."""
controller: ControllerHostConfig
targets: list[HostConfig]
skipped_python_versions: list[str]

@ -139,6 +139,7 @@ TRemoteConfig = t.TypeVar('TRemoteConfig', bound=RemoteConfig)
class ControlGroupError(ApplicationError):
"""Raised when the container host does not have the necessary cgroup support to run a container."""
def __init__(self, args: CommonConfig, reason: str) -> None:
engine = require_docker().command
dd_wsl2 = get_docker_info(args).docker_desktop_wsl2
@ -181,6 +182,7 @@ NOTE: These changes must be applied each time the container host is rebooted.
@dataclasses.dataclass(frozen=True)
class Inventory:
"""Simple representation of an Ansible inventory."""
host_groups: dict[str, dict[str, dict[str, t.Union[str, int]]]]
extra_groups: t.Optional[dict[str, list[str]]] = None
@ -226,12 +228,14 @@ class Inventory:
class HostProfile(t.Generic[THostConfig], metaclass=abc.ABCMeta):
"""Base class for host profiles."""
def __init__(self,
*,
args: EnvironmentConfig,
config: THostConfig,
targets: t.Optional[list[HostConfig]],
) -> None:
def __init__(
self,
*,
args: EnvironmentConfig,
config: THostConfig,
targets: t.Optional[list[HostConfig]],
) -> None:
self.args = args
self.config = config
self.controller = bool(targets)
@ -272,6 +276,7 @@ class HostProfile(t.Generic[THostConfig], metaclass=abc.ABCMeta):
class PosixProfile(HostProfile[TPosixConfig], metaclass=abc.ABCMeta):
"""Base class for POSIX host profiles."""
@property
def python(self) -> PythonConfig:
"""
@ -293,6 +298,7 @@ class PosixProfile(HostProfile[TPosixConfig], metaclass=abc.ABCMeta):
class ControllerHostProfile(PosixProfile[TControllerHostConfig], metaclass=abc.ABCMeta):
"""Base class for profiles usable as a controller."""
@abc.abstractmethod
def get_origin_controller_connection(self) -> Connection:
"""Return a connection for accessing the host as a controller from the origin."""
@ -304,6 +310,7 @@ class ControllerHostProfile(PosixProfile[TControllerHostConfig], metaclass=abc.A
class SshTargetHostProfile(HostProfile[THostConfig], metaclass=abc.ABCMeta):
"""Base class for profiles offering SSH connectivity."""
@abc.abstractmethod
def get_controller_target_connections(self) -> list[SshConnection]:
"""Return SSH connection(s) for accessing the host as a target from the controller."""
@ -311,6 +318,7 @@ class SshTargetHostProfile(HostProfile[THostConfig], metaclass=abc.ABCMeta):
class RemoteProfile(SshTargetHostProfile[TRemoteConfig], metaclass=abc.ABCMeta):
"""Base class for remote instance profiles."""
@property
def core_ci_state(self) -> t.Optional[dict[str, str]]:
"""The saved Ansible Core CI state."""
@ -387,6 +395,7 @@ class RemoteProfile(SshTargetHostProfile[TRemoteConfig], metaclass=abc.ABCMeta):
class ControllerProfile(SshTargetHostProfile[ControllerConfig], PosixProfile[ControllerConfig]):
"""Host profile for the controller as a target."""
def get_controller_target_connections(self) -> list[SshConnection]:
"""Return SSH connection(s) for accessing the host as a target from the controller."""
settings = SshConnectionDetail(
@ -409,6 +418,7 @@ class DockerProfile(ControllerHostProfile[DockerConfig], SshTargetHostProfile[Do
@dataclasses.dataclass(frozen=True)
class InitConfig:
"""Configuration details required to run the container init."""
options: list[str]
command: str
command_privileged: bool
@ -996,9 +1006,11 @@ class DockerProfile(ControllerHostProfile[DockerConfig], SshTargetHostProfile[Do
display.info(last_error)
if not self.args.delegate and not self.args.host_path:
def callback() -> None:
"""Callback to run during error display."""
self.on_target_failure() # when the controller is not delegated, report failures immediately
else:
callback = None
@ -1098,6 +1110,7 @@ class NetworkInventoryProfile(HostProfile[NetworkInventoryConfig]):
class NetworkRemoteProfile(RemoteProfile[NetworkRemoteConfig]):
"""Host profile for a network remote instance."""
def wait(self) -> None:
"""Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets."""
self.wait_until_ready()
@ -1174,6 +1187,7 @@ class NetworkRemoteProfile(RemoteProfile[NetworkRemoteConfig]):
class OriginProfile(ControllerHostProfile[OriginConfig]):
"""Host profile for origin."""
def get_origin_controller_connection(self) -> LocalConnection:
"""Return a connection for accessing the host as a controller from the origin."""
return LocalConnection(self.args)
@ -1185,6 +1199,7 @@ class OriginProfile(ControllerHostProfile[OriginConfig]):
class PosixRemoteProfile(ControllerHostProfile[PosixRemoteConfig], RemoteProfile[PosixRemoteConfig]):
"""Host profile for a POSIX remote instance."""
def wait(self) -> None:
"""Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets."""
self.wait_until_ready()
@ -1291,6 +1306,7 @@ class PosixRemoteProfile(ControllerHostProfile[PosixRemoteConfig], RemoteProfile
class PosixSshProfile(SshTargetHostProfile[PosixSshConfig], PosixProfile[PosixSshConfig]):
"""Host profile for a POSIX SSH instance."""
def get_controller_target_connections(self) -> list[SshConnection]:
"""Return SSH connection(s) for accessing the host as a target from the controller."""
settings = SshConnectionDetail(
@ -1307,6 +1323,7 @@ class PosixSshProfile(SshTargetHostProfile[PosixSshConfig], PosixProfile[PosixSs
class WindowsInventoryProfile(SshTargetHostProfile[WindowsInventoryConfig]):
"""Host profile for a Windows inventory."""
def get_controller_target_connections(self) -> list[SshConnection]:
"""Return SSH connection(s) for accessing the host as a target from the controller."""
inventory = parse_inventory(self.args, self.config.path)
@ -1331,6 +1348,7 @@ class WindowsInventoryProfile(SshTargetHostProfile[WindowsInventoryConfig]):
class WindowsRemoteProfile(RemoteProfile[WindowsRemoteConfig]):
"""Host profile for a Windows remote instance."""
def wait(self) -> None:
"""Wait for the instance to be ready. Executed before delegation for the controller and after delegation for targets."""
self.wait_until_ready()

@ -22,6 +22,7 @@ from .util_common import (
class HttpClient:
"""Make HTTP requests via curl."""
def __init__(self, args: CommonConfig, always: bool = False, insecure: bool = False, proxy: t.Optional[str] = None) -> None:
self.args = args
self.always = always
@ -113,6 +114,7 @@ class HttpClient:
class HttpResponse:
"""HTTP response from curl."""
def __init__(self, method: str, url: str, status_code: int, response: str) -> None:
self.method = method
self.url = url
@ -129,6 +131,7 @@ class HttpResponse:
class HttpError(ApplicationError):
"""HTTP response as an error."""
def __init__(self, status: int, message: str) -> None:
super().__init__('%s: %s' % (status, message))
self.status = status

@ -34,12 +34,13 @@ def make_dirs(path: str) -> None:
os.makedirs(to_bytes(path), exist_ok=True)
def write_json_file(path: str,
content: t.Any,
create_directories: bool = False,
formatted: bool = True,
encoder: t.Optional[t.Type[json.JSONEncoder]] = None,
) -> str:
def write_json_file(
path: str,
content: t.Any,
create_directories: bool = False,
formatted: bool = True,
encoder: t.Optional[t.Type[json.JSONEncoder]] = None,
) -> str:
"""Write the given json content to the specified path, optionally creating missing directories."""
text_content = json.dumps(content,
sort_keys=formatted,
@ -80,6 +81,7 @@ def open_binary_file(path: str, mode: str = 'rb') -> t.IO[bytes]:
class SortedSetEncoder(json.JSONEncoder):
"""Encode sets as sorted lists."""
def default(self, o: t.Any) -> t.Any:
"""Return a serialized version of the `o` object."""
if isinstance(o, set):

@ -21,6 +21,7 @@ It was not needed in previous ansible-core releases since they do not verify the
class LocaleError(SystemExit):
"""Exception to raise when locale related errors occur."""
def __init__(self, message: str) -> None:
super().__init__(f'ERROR: {message}')

@ -19,6 +19,7 @@ from .diff import (
class Metadata:
"""Metadata object for passing data to delegated tests."""
def __init__(self) -> None:
"""Initialize metadata."""
self.changes: dict[str, tuple[tuple[int, int], ...]] = {}
@ -82,6 +83,7 @@ class Metadata:
class ChangeDescription:
"""Description of changes."""
def __init__(self) -> None:
self.command: str = ''
self.changed_paths: list[str] = []

@ -69,7 +69,7 @@ def create_payload(args: CommonConfig, dst_path: str) -> None:
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH |
stat.S_IWUSR
)
) # fmt: skip
def make_non_executable(tar_info: tarfile.TarInfo) -> t.Optional[tarfile.TarInfo]:
"""
@ -81,7 +81,7 @@ def create_payload(args: CommonConfig, dst_path: str) -> None:
tar_info,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IWUSR
)
) # fmt: skip
def detect_permissions(tar_info: tarfile.TarInfo) -> t.Optional[tarfile.TarInfo]:
"""

@ -16,11 +16,12 @@ def get_path_provider_classes(provider_type: t.Type[TPathProvider]) -> list[t.Ty
return sorted(get_subclasses(provider_type), key=lambda subclass: (subclass.priority, subclass.__name__))
def find_path_provider(provider_type: t.Type[TPathProvider],
provider_classes: list[t.Type[TPathProvider]],
path: str,
walk: bool,
) -> TPathProvider:
def find_path_provider(
provider_type: t.Type[TPathProvider],
provider_classes: list[t.Type[TPathProvider]],
path: str,
walk: bool,
) -> TPathProvider:
"""Return the first found path provider of the given type for the given path."""
sequences = sorted(set(pc.sequence for pc in provider_classes if pc.sequence > 0))
@ -48,6 +49,7 @@ def find_path_provider(provider_type: t.Type[TPathProvider],
class ProviderNotFoundForPath(ApplicationError):
"""Exception generated when a path based provider cannot be found for a given path."""
def __init__(self, provider_type: t.Type, path: str) -> None:
super().__init__('No %s found for path: %s' % (provider_type.__name__, path))
@ -57,6 +59,7 @@ class ProviderNotFoundForPath(ApplicationError):
class PathProvider(metaclass=abc.ABCMeta):
"""Base class for provider plugins that are path based."""
sequence = 500
priority = 500

@ -17,10 +17,12 @@ from .. import (
class Layout:
"""Description of content locations and helper methods to access content."""
def __init__(self,
root: str,
paths: list[str],
) -> None:
def __init__(
self,
root: str,
paths: list[str],
) -> None:
self.root = root
self.__paths = paths # contains both file paths and symlinked directory paths (ending with os.path.sep)
@ -74,25 +76,27 @@ class Layout:
class ContentLayout(Layout):
"""Information about the current Ansible content being tested."""
def __init__(self,
root: str,
paths: list[str],
plugin_paths: dict[str, str],
collection: t.Optional[CollectionDetail],
test_path: str,
results_path: str,
sanity_path: str,
sanity_messages: t.Optional[LayoutMessages],
integration_path: str,
integration_targets_path: str,
integration_vars_path: str,
integration_messages: t.Optional[LayoutMessages],
unit_path: str,
unit_module_path: str,
unit_module_utils_path: str,
unit_messages: t.Optional[LayoutMessages],
unsupported: bool = False,
) -> None:
def __init__(
self,
root: str,
paths: list[str],
plugin_paths: dict[str, str],
collection: t.Optional[CollectionDetail],
test_path: str,
results_path: str,
sanity_path: str,
sanity_messages: t.Optional[LayoutMessages],
integration_path: str,
integration_targets_path: str,
integration_vars_path: str,
integration_messages: t.Optional[LayoutMessages],
unit_path: str,
unit_module_path: str,
unit_module_utils_path: str,
unit_messages: t.Optional[LayoutMessages],
unsupported: bool = False,
) -> None:
super().__init__(root, paths)
self.plugin_paths = plugin_paths
@ -150,6 +154,7 @@ class ContentLayout(Layout):
class LayoutMessages:
"""Messages generated during layout creation that should be deferred for later display."""
def __init__(self) -> None:
self.info: list[str] = []
self.warning: list[str] = []
@ -158,11 +163,13 @@ class LayoutMessages:
class CollectionDetail:
"""Details about the layout of the current collection."""
def __init__(self,
name: str,
namespace: str,
root: str,
) -> None:
def __init__(
self,
name: str,
namespace: str,
root: str,
) -> None:
self.name = name
self.namespace = namespace
self.root = root
@ -173,6 +180,7 @@ class CollectionDetail:
class LayoutProvider(PathProvider):
"""Base class for layout providers."""
PLUGIN_TYPES = (
'action',
'become',

@ -11,6 +11,7 @@ from . import (
class AnsibleLayout(LayoutProvider):
"""Layout provider for Ansible source."""
@staticmethod
def is_content_root(path: str) -> bool:
"""Return True if the given path is a content root for this provider."""
@ -20,25 +21,26 @@ class AnsibleLayout(LayoutProvider):
"""Create a Layout using the given root and paths."""
plugin_paths = dict((p, os.path.join('lib/ansible/plugins', p)) for p in self.PLUGIN_TYPES)
plugin_paths.update(dict(
plugin_paths.update(
modules='lib/ansible/modules',
module_utils='lib/ansible/module_utils',
))
return ContentLayout(root,
paths,
plugin_paths=plugin_paths,
collection=None,
test_path='test',
results_path='test/results',
sanity_path='test/sanity',
sanity_messages=None,
integration_path='test/integration',
integration_targets_path='test/integration/targets',
integration_vars_path='test/integration/integration_config.yml',
integration_messages=None,
unit_path='test/units',
unit_module_path='test/units/modules',
unit_module_utils_path='test/units/module_utils',
unit_messages=None,
)
)
return ContentLayout(
root,
paths,
plugin_paths=plugin_paths,
collection=None,
test_path='test',
results_path='test/results',
sanity_path='test/sanity',
sanity_messages=None,
integration_path='test/integration',
integration_targets_path='test/integration/targets',
integration_vars_path='test/integration/integration_config.yml',
integration_messages=None,
unit_path='test/units',
unit_module_path='test/units/modules',
unit_module_utils_path='test/units/module_utils',
unit_messages=None,
)

@ -17,6 +17,7 @@ from ...util import (
class CollectionLayout(LayoutProvider):
"""Layout provider for Ansible collections."""
@staticmethod
def is_content_root(path: str) -> bool:
"""Return True if the given path is a content root for this provider."""
@ -52,28 +53,29 @@ class CollectionLayout(LayoutProvider):
integration_targets_path = self.__check_integration_path(paths, integration_messages)
self.__check_unit_path(paths, unit_messages)
return ContentLayout(root,
paths,
plugin_paths=plugin_paths,
collection=CollectionDetail(
name=collection_name,
namespace=collection_namespace,
root=collection_root,
),
test_path='tests',
results_path='tests/output',
sanity_path='tests/sanity',
sanity_messages=sanity_messages,
integration_path='tests/integration',
integration_targets_path=integration_targets_path.rstrip(os.path.sep),
integration_vars_path='tests/integration/integration_config.yml',
integration_messages=integration_messages,
unit_path='tests/unit',
unit_module_path='tests/unit/plugins/modules',
unit_module_utils_path='tests/unit/plugins/module_utils',
unit_messages=unit_messages,
unsupported=not (is_valid_identifier(collection_namespace) and is_valid_identifier(collection_name)),
)
return ContentLayout(
root,
paths,
plugin_paths=plugin_paths,
collection=CollectionDetail(
name=collection_name,
namespace=collection_namespace,
root=collection_root,
),
test_path='tests',
results_path='tests/output',
sanity_path='tests/sanity',
sanity_messages=sanity_messages,
integration_path='tests/integration',
integration_targets_path=integration_targets_path.rstrip(os.path.sep),
integration_vars_path='tests/integration/integration_config.yml',
integration_messages=integration_messages,
unit_path='tests/unit',
unit_module_path='tests/unit/plugins/modules',
unit_module_utils_path='tests/unit/plugins/module_utils',
unit_messages=unit_messages,
unsupported=not (is_valid_identifier(collection_namespace) and is_valid_identifier(collection_name)),
)
@staticmethod
def __check_test_path(paths: list[str], messages: LayoutMessages) -> None:

@ -9,6 +9,7 @@ from . import (
class UnsupportedLayout(LayoutProvider):
"""Layout provider for an unsupported directory layout."""
sequence = 0 # disable automatic detection
@staticmethod
@ -20,21 +21,22 @@ class UnsupportedLayout(LayoutProvider):
"""Create a Layout using the given root and paths."""
plugin_paths = dict((p, p) for p in self.PLUGIN_TYPES)
return ContentLayout(root,
paths,
plugin_paths=plugin_paths,
collection=None,
test_path='',
results_path='',
sanity_path='',
sanity_messages=None,
integration_path='',
integration_targets_path='',
integration_vars_path='',
integration_messages=None,
unit_path='',
unit_module_path='',
unit_module_utils_path='',
unit_messages=None,
unsupported=True,
)
return ContentLayout(
root,
paths,
plugin_paths=plugin_paths,
collection=None,
test_path='',
results_path='',
sanity_path='',
sanity_messages=None,
integration_path='',
integration_targets_path='',
integration_vars_path='',
integration_messages=None,
unit_path='',
unit_module_path='',
unit_module_utils_path='',
unit_messages=None,
unsupported=True,
)

@ -10,6 +10,7 @@ from .. import (
class SourceProvider(PathProvider):
"""Base class for source providers."""
@abc.abstractmethod
def get_paths(self, path: str) -> list[str]:
"""Return the list of available content paths under the given path."""

@ -22,6 +22,7 @@ from . import (
class GitSource(SourceProvider):
"""Source provider for a content root managed by git version control."""
@staticmethod
def is_content_root(path: str) -> bool:
"""Return True if the given path is a content root for this provider."""

@ -10,6 +10,7 @@ from . import (
class InstalledSource(SourceProvider):
"""Source provider for content which has been installed."""
sequence = 0 # disable automatic detection
@staticmethod

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save