"""Loading, parsing and storing of completion configurations.""" from __future__ import annotations import abc import dataclasses import enum import os import typing as t from .constants import ( CONTROLLER_PYTHON_VERSIONS, SUPPORTED_PYTHON_VERSIONS, ) from .util import ( ANSIBLE_TEST_DATA_ROOT, cache, read_lines_without_comments, ) from .data import ( data_context, ) from .become import ( SUPPORTED_BECOME_METHODS, ) class CGroupVersion(enum.Enum): """The control group version(s) required by a container.""" NONE = 'none' V1_ONLY = 'v1-only' V2_ONLY = 'v2-only' V1_V2 = 'v1-v2' def __repr__(self) -> str: return f'{self.__class__.__name__}.{self.name}' class AuditMode(enum.Enum): """The audit requirements of a container.""" NONE = 'none' REQUIRED = 'required' def __repr__(self) -> str: return f'{self.__class__.__name__}.{self.name}' @dataclasses.dataclass(frozen=True) class CompletionConfig(metaclass=abc.ABCMeta): """Base class for completion configuration.""" name: str @property @abc.abstractmethod def is_default(self) -> bool: """True if the completion entry is only used for defaults, otherwise False.""" @dataclasses.dataclass(frozen=True) class PosixCompletionConfig(CompletionConfig, metaclass=abc.ABCMeta): """Base class for completion configuration of POSIX environments.""" @property @abc.abstractmethod def supported_pythons(self) -> list[str]: """Return a list of the supported Python versions.""" @abc.abstractmethod def get_python_path(self, version: str) -> str: """Return the path of the requested Python version.""" def get_default_python(self, controller: bool) -> str: """Return the default Python version for a controller or target as specified.""" context_pythons = CONTROLLER_PYTHON_VERSIONS if controller else SUPPORTED_PYTHON_VERSIONS version = [python for python in self.supported_pythons if python in context_pythons][0] return version @property def controller_supported(self) -> bool: """True if at least one Python version is provided which supports the controller, otherwise False.""" return any(version in CONTROLLER_PYTHON_VERSIONS for version in self.supported_pythons) @dataclasses.dataclass(frozen=True) class PythonCompletionConfig(PosixCompletionConfig, metaclass=abc.ABCMeta): """Base class for completion configuration of Python environments.""" python: str = '' python_dir: str = '/usr/bin' @property def supported_pythons(self) -> list[str]: """Return a list of the supported Python versions.""" versions = self.python.split(',') if self.python else [] versions = [version for version in versions if version in SUPPORTED_PYTHON_VERSIONS] return versions def get_python_path(self, version: str) -> str: """Return the path of the requested Python version.""" return os.path.join(self.python_dir, f'python{version}') @dataclasses.dataclass(frozen=True) class RemoteCompletionConfig(CompletionConfig): """Base class for completion configuration of remote environments provisioned through Ansible Core CI.""" provider: t.Optional[str] = None arch: t.Optional[str] = None @property def platform(self) -> str: """The name of the platform.""" return self.name.partition('/')[0] @property def version(self) -> str: """The version of the platform.""" return self.name.partition('/')[2] @property def is_default(self) -> bool: """True if the completion entry is only used for defaults, otherwise False.""" return not self.version def __post_init__(self): if not self.provider: raise Exception(f'Remote completion entry "{self.name}" must provide a "provider" setting.') if not self.arch: raise Exception(f'Remote completion entry "{self.name}" must provide a "arch" setting.') @dataclasses.dataclass(frozen=True) class InventoryCompletionConfig(CompletionConfig): """Configuration for inventory files.""" def __init__(self) -> None: super().__init__(name='inventory') @property def is_default(self) -> bool: """True if the completion entry is only used for defaults, otherwise False.""" return False @dataclasses.dataclass(frozen=True) class PosixSshCompletionConfig(PythonCompletionConfig): """Configuration for a POSIX host reachable over SSH.""" def __init__(self, user: str, host: str) -> None: super().__init__( name=f'{user}@{host}', python=','.join(SUPPORTED_PYTHON_VERSIONS), ) @property def is_default(self) -> bool: """True if the completion entry is only used for defaults, otherwise False.""" return False @dataclasses.dataclass(frozen=True) class DockerCompletionConfig(PythonCompletionConfig): """Configuration for Docker containers.""" image: str = '' seccomp: str = 'default' cgroup: str = CGroupVersion.V1_V2.value audit: str = AuditMode.REQUIRED.value # most containers need this, so the default is required, leaving it to be opt-out for containers which don't need it placeholder: bool = False @property def is_default(self) -> bool: """True if the completion entry is only used for defaults, otherwise False.""" return False @property def audit_enum(self) -> AuditMode: """The audit requirements for the container. Raises an exception if the value is invalid.""" try: return AuditMode(self.audit) except ValueError: raise ValueError(f'Docker completion entry "{self.name}" has an invalid value "{self.audit}" for the "audit" setting.') from None @property def cgroup_enum(self) -> CGroupVersion: """The control group version(s) required by the container. Raises an exception if the value is invalid.""" try: return CGroupVersion(self.cgroup) except ValueError: raise ValueError(f'Docker completion entry "{self.name}" has an invalid value "{self.cgroup}" for the "cgroup" setting.') from None def __post_init__(self): if not self.image: raise Exception(f'Docker completion entry "{self.name}" must provide an "image" setting.') if not self.supported_pythons and not self.placeholder: raise Exception(f'Docker completion entry "{self.name}" must provide a "python" setting.') # verify properties can be correctly parsed to enums assert self.audit_enum assert self.cgroup_enum @dataclasses.dataclass(frozen=True) class NetworkRemoteCompletionConfig(RemoteCompletionConfig): """Configuration for remote network platforms.""" collection: str = '' connection: str = '' placeholder: bool = False def __post_init__(self): if not self.placeholder: super().__post_init__() @dataclasses.dataclass(frozen=True) class PosixRemoteCompletionConfig(RemoteCompletionConfig, PythonCompletionConfig): """Configuration for remote POSIX platforms.""" become: t.Optional[str] = None placeholder: bool = False def __post_init__(self): if not self.placeholder: super().__post_init__() if self.become and self.become not in SUPPORTED_BECOME_METHODS: raise Exception(f'POSIX remote completion entry "{self.name}" setting "become" must be omitted or one of: {", ".join(SUPPORTED_BECOME_METHODS)}') if not self.supported_pythons: if self.version and not self.placeholder: raise Exception(f'POSIX remote completion entry "{self.name}" must provide a "python" setting.') else: if not self.version: raise Exception(f'POSIX remote completion entry "{self.name}" is a platform default and cannot provide a "python" setting.') @dataclasses.dataclass(frozen=True) class WindowsRemoteCompletionConfig(RemoteCompletionConfig): """Configuration for remote Windows platforms.""" connection: str = '' TCompletionConfig = t.TypeVar('TCompletionConfig', bound=CompletionConfig) def load_completion(name: str, completion_type: t.Type[TCompletionConfig]) -> dict[str, TCompletionConfig]: """Load the named completion entries, returning them in dictionary form using the specified completion type.""" lines = read_lines_without_comments(os.path.join(ANSIBLE_TEST_DATA_ROOT, 'completion', '%s.txt' % name), remove_blank_lines=True) if data_context().content.collection: context = 'collection' else: context = 'ansible-core' items = {name: data for name, data in [parse_completion_entry(line) for line in lines] if data.get('context', context) == context} for item in items.values(): item.pop('context', None) item.pop('placeholder', None) completion = {name: completion_type(name=name, **data) for name, data in items.items()} return completion def parse_completion_entry(value: str) -> tuple[str, dict[str, str]]: """Parse the given completion entry, returning the entry name and a dictionary of key/value settings.""" values = value.split() name = values[0] data = {kvp[0]: kvp[1] if len(kvp) > 1 else '' for kvp in [item.split('=', 1) for item in values[1:]]} return name, data def filter_completion( completion: dict[str, TCompletionConfig], controller_only: bool = False, include_defaults: bool = False, ) -> dict[str, TCompletionConfig]: """Return the given completion dictionary, filtering out configs which do not support the controller if controller_only is specified.""" if controller_only: # The cast is needed because mypy gets confused here and forgets that completion values are TCompletionConfig. completion = {name: t.cast(TCompletionConfig, config) for name, config in completion.items() if isinstance(config, PosixCompletionConfig) and config.controller_supported} if not include_defaults: completion = {name: config for name, config in completion.items() if not config.is_default} return completion @cache def docker_completion() -> dict[str, DockerCompletionConfig]: """Return docker completion entries.""" return load_completion('docker', DockerCompletionConfig) @cache def remote_completion() -> dict[str, PosixRemoteCompletionConfig]: """Return remote completion entries.""" return load_completion('remote', PosixRemoteCompletionConfig) @cache def windows_completion() -> dict[str, WindowsRemoteCompletionConfig]: """Return windows completion entries.""" return load_completion('windows', WindowsRemoteCompletionConfig) @cache def network_completion() -> dict[str, NetworkRemoteCompletionConfig]: """Return network completion entries.""" return load_completion('network', NetworkRemoteCompletionConfig)