From 0d5a9f2138b0626e1c836333e3af0b73bdc31ec8 Mon Sep 17 00:00:00 2001 From: Matt Clay Date: Fri, 24 Sep 2021 13:28:41 -0700 Subject: [PATCH] ansible-test - More PEP 484 type hints. --- .../_internal/commands/sanity/__init__.py | 68 ++----- .../commands/sanity/integration_aliases.py | 45 ++--- .../lib/ansible_test/_internal/connections.py | 7 +- test/lib/ansible_test/_internal/delegation.py | 17 +- .../_internal/provider/__init__.py | 10 +- test/lib/ansible_test/_internal/target.py | 156 ++++++---------- test/lib/ansible_test/_internal/test.py | 173 +++++++----------- test/lib/ansible_test/_internal/util.py | 71 ++++--- .../lib/ansible_test/_internal/util_common.py | 33 ++-- .../sanity/pylint/plugins/unwanted.py | 76 +++----- .../controller/sanity/yamllint/yamllinter.py | 57 ++---- 11 files changed, 256 insertions(+), 457 deletions(-) diff --git a/test/lib/ansible_test/_internal/commands/sanity/__init__.py b/test/lib/ansible_test/_internal/commands/sanity/__init__.py index f30d9ff0399..542e078ad18 100644 --- a/test/lib/ansible_test/_internal/commands/sanity/__init__.py +++ b/test/lib/ansible_test/_internal/commands/sanity/__init__.py @@ -84,6 +84,7 @@ from ...test import ( TestFailure, TestSkipped, TestMessage, + TestResult, calculate_best_confidence, ) @@ -125,10 +126,8 @@ TARGET_SANITY_ROOT = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'sanity') created_venvs = [] # type: t.List[str] -def command_sanity(args): - """ - :type args: SanityConfig - """ +def command_sanity(args): # type: (SanityConfig) -> None + """Run sanity tests.""" create_result_directories(args) target_configs = t.cast(t.List[PosixConfig], args.targets) @@ -606,33 +605,25 @@ class SanityIgnoreProcessor: class SanitySuccess(TestSuccess): """Sanity test success.""" - def __init__(self, test, python_version=None): - """ - :type test: str - :type python_version: str - """ + def __init__(self, test, python_version=None): # type: (str, t.Optional[str]) -> None super().__init__(COMMAND, test, python_version) class SanitySkipped(TestSkipped): """Sanity test skipped.""" - def __init__(self, test, python_version=None): - """ - :type test: str - :type python_version: str - """ + def __init__(self, test, python_version=None): # type: (str, t.Optional[str]) -> None super().__init__(COMMAND, test, python_version) class SanityFailure(TestFailure): """Sanity test failure.""" - def __init__(self, test, python_version=None, messages=None, summary=None): - """ - :type test: str - :type python_version: str - :type messages: list[SanityMessage] - :type summary: unicode - """ + def __init__( + self, + test, # type: str + python_version=None, # type: t.Optional[str] + messages=None, # type: t.Optional[t.List[SanityMessage]] + summary=None, # type: t.Optional[str] + ): # type: (...) -> None super().__init__(COMMAND, test, python_version, messages, summary) @@ -809,13 +800,8 @@ class SanitySingleVersion(SanityTest, metaclass=abc.ABCMeta): return False @abc.abstractmethod - def test(self, args, targets, python): - """ - :type args: SanityConfig - :type targets: SanityTargets - :type python: PythonConfig - :rtype: TestResult - """ + def test(self, args, targets, python): # type: (SanityConfig, SanityTargets, PythonConfig) -> TestResult + """Run the sanity test and return the result.""" def load_processor(self, args): # type: (SanityConfig) -> SanityIgnoreProcessor """Load the ignore processor for this sanity test.""" @@ -947,13 +933,8 @@ class SanityCodeSmellTest(SanitySingleVersion): return targets - def test(self, args, targets, python): - """ - :type args: SanityConfig - :type targets: SanityTargets - :type python: PythonConfig - :rtype: TestResult - """ + def test(self, args, targets, python): # type: (SanityConfig, SanityTargets, PythonConfig) -> TestResult + """Run the sanity test and return the result.""" cmd = [python.path, self.path] env = ansible_environment(args, color=False) @@ -1027,12 +1008,8 @@ class SanityCodeSmellTest(SanitySingleVersion): class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta): """Base class for sanity test plugins which are idependent of the python version being used.""" @abc.abstractmethod - def test(self, args, targets): - """ - :type args: SanityConfig - :type targets: SanityTargets - :rtype: TestResult - """ + def test(self, args, targets): # type: (SanityConfig, SanityTargets) -> TestResult + """Run the sanity test and return the result.""" def load_processor(self, args): # type: (SanityConfig) -> SanityIgnoreProcessor """Load the ignore processor for this sanity test.""" @@ -1047,13 +1024,8 @@ class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta): class SanityMultipleVersion(SanityTest, metaclass=abc.ABCMeta): """Base class for sanity test plugins which should run on multiple python versions.""" @abc.abstractmethod - def test(self, args, targets, python): - """ - :type args: SanityConfig - :type targets: SanityTargets - :type python: PythonConfig - :rtype: TestResult - """ + def test(self, args, targets, python): # type: (SanityConfig, SanityTargets, PythonConfig) -> TestResult + """Run the sanity test and return the result.""" def load_processor(self, args, python_version): # type: (SanityConfig, str) -> SanityIgnoreProcessor """Load the ignore processor for this sanity test.""" diff --git a/test/lib/ansible_test/_internal/commands/sanity/integration_aliases.py b/test/lib/ansible_test/_internal/commands/sanity/integration_aliases.py index bce4f0cf4fc..bc3ebc0d108 100644 --- a/test/lib/ansible_test/_internal/commands/sanity/integration_aliases.py +++ b/test/lib/ansible_test/_internal/commands/sanity/integration_aliases.py @@ -29,6 +29,7 @@ from ...target import ( walk_windows_integration_targets, walk_integration_targets, walk_module_targets, + CompletionTarget, ) from ..integration.cloud import ( @@ -169,12 +170,8 @@ class IntegrationAliasesTest(SanitySingleVersion): return self._ci_test_groups - def format_test_group_alias(self, name, fallback=''): - """ - :type name: str - :type fallback: str - :rtype: str - """ + def format_test_group_alias(self, name, fallback=''): # type: (str, str) -> str + """Return a test group alias using the given name and fallback.""" group_numbers = self.ci_test_groups.get(name, None) if group_numbers: @@ -232,11 +229,8 @@ class IntegrationAliasesTest(SanitySingleVersion): return SanitySuccess(self.name) - def check_posix_targets(self, args): - """ - :type args: SanityConfig - :rtype: list[SanityMessage] - """ + def check_posix_targets(self, args): # type: (SanityConfig) -> t.List[SanityMessage] + """Check POSIX integration test targets and return messages with any issues found.""" posix_targets = tuple(walk_posix_integration_targets()) clouds = get_cloud_platforms(args, posix_targets) @@ -298,13 +292,13 @@ class IntegrationAliasesTest(SanitySingleVersion): return messages - def check_ci_group(self, targets, find, find_incidental=None): - """ - :type targets: tuple[CompletionTarget] - :type find: str - :type find_incidental: list[str] | None - :rtype: list[SanityMessage] - """ + def check_ci_group( + self, + targets, # type: t.Tuple[CompletionTarget, ...] + find, # type: str + find_incidental=None, # type: t.Optional[t.List[str]] + ): # type: (...) -> t.List[SanityMessage] + """Check the CI groups set in the provided targets and return a list of messages with any issues found.""" all_paths = set(target.path for target in targets) supported_paths = set(target.path for target in filter_targets(targets, [find], directories=False, errors=False)) unsupported_paths = set(target.path for target in filter_targets(targets, [self.UNSUPPORTED], directories=False, errors=False)) @@ -330,11 +324,8 @@ class IntegrationAliasesTest(SanitySingleVersion): return messages - def check_changes(self, args, results): - """ - :type args: SanityConfig - :type results: dict[str, any] - """ + def check_changes(self, args, results): # type: (SanityConfig, t.Dict[str, t.Any]) -> None + """Check changes and store results in the provided results dictionary.""" integration_targets = list(walk_integration_targets()) module_targets = list(walk_module_targets()) @@ -381,12 +372,8 @@ class IntegrationAliasesTest(SanitySingleVersion): results['comments'] += comments results['labels'].update(labels) - def format_comment(self, template, targets): - """ - :type template: str - :type targets: list[str] - :rtype: str | None - """ + def format_comment(self, template, targets): # type: (str, t.List[str]) -> t.Optional[str] + """Format and return a comment based on the given template and targets, or None if there are no targets.""" if not targets: return None diff --git a/test/lib/ansible_test/_internal/connections.py b/test/lib/ansible_test/_internal/connections.py index 7835cd3a916..ddf4e8df38c 100644 --- a/test/lib/ansible_test/_internal/connections.py +++ b/test/lib/ansible_test/_internal/connections.py @@ -2,7 +2,6 @@ from __future__ import annotations import abc -import functools import shlex import sys import tempfile @@ -160,6 +159,10 @@ class SshConnection(Connection): options.append(f'{self.settings.user}@{self.settings.host}') options.append(' '.join(shlex.quote(cmd) for cmd in command)) + def error_callback(ex): # type: (SubprocessError) -> None + """Error handler.""" + self.capture_log_details(ssh_logfile.name, ex) + return run_command( args=self.args, cmd=['ssh'] + options, @@ -167,7 +170,7 @@ class SshConnection(Connection): data=data, stdin=stdin, stdout=stdout, - error_callback=functools.partial(self.capture_log_details, ssh_logfile.name), + error_callback=error_callback, ) @staticmethod diff --git a/test/lib/ansible_test/_internal/delegation.py b/test/lib/ansible_test/_internal/delegation.py index 48d13be5760..aaee0dfac0c 100644 --- a/test/lib/ansible_test/_internal/delegation.py +++ b/test/lib/ansible_test/_internal/delegation.py @@ -291,15 +291,14 @@ def generate_command( return cmd -def filter_options(args, argv, options, exclude, require): - """ - :type args: EnvironmentConfig - :type argv: list[str] - :type options: dict[str, int] - :type exclude: list[str] - :type require: list[str] - :rtype: collections.Iterable[str] - """ +def filter_options( + args, # type: EnvironmentConfig + argv, # type: t.List[str] + options, # type: t.Dict[str, int] + exclude, # type: t.List[str] + require, # type: t.List[str] +): # type: (...) -> t.Iterable[str] + """Return an iterable that filters out unwanted CLI options and injects new ones as requested.""" options = options.copy() options['--truncate'] = 1 diff --git a/test/lib/ansible_test/_internal/provider/__init__.py b/test/lib/ansible_test/_internal/provider/__init__.py index ce61e13527b..e8972ac87c4 100644 --- a/test/lib/ansible_test/_internal/provider/__init__.py +++ b/test/lib/ansible_test/_internal/provider/__init__.py @@ -11,13 +11,6 @@ from ..util import ( ) -try: - # noinspection PyTypeChecker - TPathProvider = t.TypeVar('TPathProvider', bound='PathProvider') -except AttributeError: - TPathProvider = None # pylint: disable=invalid-name - - def get_path_provider_classes(provider_type): # type: (t.Type[TPathProvider]) -> t.List[t.Type[TPathProvider]] """Return a list of path provider classes of the given type.""" return sorted(get_subclasses(provider_type), key=lambda c: (c.priority, c.__name__)) @@ -74,3 +67,6 @@ class PathProvider(metaclass=abc.ABCMeta): @abc.abstractmethod def is_content_root(path): # type: (str) -> bool """Return True if the given path is a content root for this provider.""" + + +TPathProvider = t.TypeVar('TPathProvider', bound=PathProvider) diff --git a/test/lib/ansible_test/_internal/target.py b/test/lib/ansible_test/_internal/target.py index 934f969d48a..ced111f784f 100644 --- a/test/lib/ansible_test/_internal/target.py +++ b/test/lib/ansible_test/_internal/target.py @@ -32,26 +32,9 @@ from .data import ( MODULE_EXTENSIONS = '.py', '.ps1' -try: - # noinspection PyTypeChecker - TCompletionTarget = t.TypeVar('TCompletionTarget', bound='CompletionTarget') -except AttributeError: - TCompletionTarget = None # pylint: disable=invalid-name -try: - # noinspection PyTypeChecker - TIntegrationTarget = t.TypeVar('TIntegrationTarget', bound='IntegrationTarget') -except AttributeError: - TIntegrationTarget = None # pylint: disable=invalid-name - - -def find_target_completion(target_func, prefix, short): - """ - :type target_func: () -> collections.Iterable[CompletionTarget] - :type prefix: unicode - :type short: bool - :rtype: list[str] - """ +def find_target_completion(target_func, prefix, short): # type: (t.Callable[[], t.Iterable[CompletionTarget]], str, bool) -> t.List[str] + """Return a list of targets from the given target function which match the given prefix.""" try: targets = target_func() matches = list(walk_completion_targets(targets, prefix, short)) @@ -60,13 +43,8 @@ def find_target_completion(target_func, prefix, short): return [u'%s' % ex] -def walk_completion_targets(targets, prefix, short=False): - """ - :type targets: collections.Iterable[CompletionTarget] - :type prefix: str - :type short: bool - :rtype: tuple[str] - """ +def walk_completion_targets(targets, prefix, short=False): # type: (t.Iterable[CompletionTarget], str, bool) -> t.Tuple[str, ...] + """Return a tuple of targets from the given target iterable which match the given prefix.""" aliases = set(alias for target in targets for alias in target.aliases) if prefix.endswith('/') and prefix in aliases: @@ -85,14 +63,13 @@ def walk_completion_targets(targets, prefix, short=False): return tuple(sorted(matches)) -def walk_internal_targets(targets, includes=None, excludes=None, requires=None): - """ - :type targets: collections.Iterable[T <= CompletionTarget] - :type includes: list[str] - :type excludes: list[str] - :type requires: list[str] - :rtype: tuple[T <= CompletionTarget] - """ +def walk_internal_targets( + targets, # type: t.Iterable[TCompletionTarget] + includes=None, # type: t.Optional[t.List[str]] + excludes=None, # type: t.Optional[t.List[str]] + requires=None, # type: t.Optional[t.List[str]] +): # type: (...) -> t.Tuple[TCompletionTarget, ...] + """Return a tuple of matching completion targets.""" targets = tuple(targets) include_targets = sorted(filter_targets(targets, includes, directories=False), key=lambda include_target: include_target.name) @@ -173,69 +150,49 @@ def walk_module_targets(): yield target -def walk_units_targets(): - """ - :rtype: collections.Iterable[TestTarget] - """ +def walk_units_targets(): # type: () -> t.Iterable[TestTarget] + """Return an iterable of units targets.""" return walk_test_targets(path=data_context().content.unit_path, module_path=data_context().content.unit_module_path, extensions=('.py',), prefix='test_') -def walk_compile_targets(include_symlinks=True): - """ - :type include_symlinks: bool - :rtype: collections.Iterable[TestTarget] - """ +def walk_compile_targets(include_symlinks=True): # type: (bool) -> t.Iterable[TestTarget, ...] + """Return an iterable of compile targets.""" return walk_test_targets(module_path=data_context().content.module_path, extensions=('.py',), extra_dirs=('bin',), include_symlinks=include_symlinks) -def walk_powershell_targets(include_symlinks=True): - """ - :rtype: collections.Iterable[TestTarget] - """ +def walk_powershell_targets(include_symlinks=True): # type: (bool) -> t.Iterable[TestTarget] + """Return an iterable of PowerShell targets.""" return walk_test_targets(module_path=data_context().content.module_path, extensions=('.ps1', '.psm1'), include_symlinks=include_symlinks) -def walk_sanity_targets(): - """ - :rtype: collections.Iterable[TestTarget] - """ +def walk_sanity_targets(): # type: () -> t.Iterable[TestTarget] + """Return an iterable of sanity targets.""" return walk_test_targets(module_path=data_context().content.module_path, include_symlinks=True, include_symlinked_directories=True) -def walk_posix_integration_targets(include_hidden=False): - """ - :type include_hidden: bool - :rtype: collections.Iterable[IntegrationTarget] - """ +def walk_posix_integration_targets(include_hidden=False): # type: (bool) -> t.Iterable[IntegrationTarget] + """Return an iterable of POSIX integration targets.""" for target in walk_integration_targets(): if 'posix/' in target.aliases or (include_hidden and 'hidden/posix/' in target.aliases): yield target -def walk_network_integration_targets(include_hidden=False): - """ - :type include_hidden: bool - :rtype: collections.Iterable[IntegrationTarget] - """ +def walk_network_integration_targets(include_hidden=False): # type: (bool) -> t.Iterable[IntegrationTarget] + """Return an iterable of network integration targets.""" for target in walk_integration_targets(): if 'network/' in target.aliases or (include_hidden and 'hidden/network/' in target.aliases): yield target -def walk_windows_integration_targets(include_hidden=False): - """ - :type include_hidden: bool - :rtype: collections.Iterable[IntegrationTarget] - """ +def walk_windows_integration_targets(include_hidden=False): # type: (bool) -> t.Iterable[IntegrationTarget] + """Return an iterable of windows integration targets.""" for target in walk_integration_targets(): if 'windows/' in target.aliases or (include_hidden and 'hidden/windows/' in target.aliases): yield target -def walk_integration_targets(): - """ - :rtype: collections.Iterable[IntegrationTarget] - """ +def walk_integration_targets(): # type: () -> t.Iterable[IntegrationTarget] + """Return an iterable of integration targets.""" path = data_context().content.integration_targets_path modules = frozenset(target.module for target in walk_module_targets()) paths = data_context().content.walk_files(path) @@ -305,17 +262,16 @@ def load_integration_prefixes(): return prefixes -def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None, extra_dirs=None, include_symlinks=False, include_symlinked_directories=False): - """ - :type path: str | None - :type module_path: str | None - :type extensions: tuple[str] | None - :type prefix: str | None - :type extra_dirs: tuple[str] | None - :type include_symlinks: bool - :type include_symlinked_directories: bool - :rtype: collections.Iterable[TestTarget] - """ +def walk_test_targets( + path=None, # type: t.Optional[str] + module_path=None, # type: t.Optional[str] + extensions=None, # type: t.Optional[t.Tuple[str, ...]] + prefix=None, # type: t.Optional[str] + extra_dirs=None, # type: t.Optional[t.Tuple[str, ...]] + include_symlinks=False, # type: bool + include_symlinked_directories=False, # type: bool +): # type: (...) -> t.Iterable[TestTarget] + """Iterate over available test targets.""" if path: file_paths = data_context().content.walk_files(path, include_symlinked_directories=include_symlinked_directories) else: @@ -486,11 +442,7 @@ class CompletionTarget(metaclass=abc.ABCMeta): class DirectoryTarget(CompletionTarget): """Directory target.""" - def __init__(self, path, modules): - """ - :type path: str - :type modules: tuple[str] - """ + def __init__(self, path, modules): # type: (str, t.Tuple[str, ...]) -> None super().__init__() self.name = path @@ -500,14 +452,14 @@ class DirectoryTarget(CompletionTarget): class TestTarget(CompletionTarget): """Generic test target.""" - def __init__(self, path, module_path, module_prefix, base_path, symlink=None): - """ - :type path: str - :type module_path: str | None - :type module_prefix: str | None - :type base_path: str - :type symlink: bool | None - """ + def __init__( + self, + path, # type: str + module_path, # type: t.Optional[str] + module_prefix, # type: t.Optional[str] + base_path, # type: str + symlink=None, # type: t.Optional[bool] + ): super().__init__() if symlink is None: @@ -614,12 +566,7 @@ class IntegrationTarget(CompletionTarget): 'skip', ))) - def __init__(self, path, modules, prefixes): - """ - :type path: str - :type modules: frozenset[str] - :type prefixes: dict[str, str] - """ + def __init__(self, path, modules, prefixes): # type: (str, t.FrozenSet[str], t.Dict[str, str]) -> None super().__init__() self.relative_path = os.path.relpath(path, data_context().content.integration_targets_path) @@ -763,10 +710,7 @@ class IntegrationTarget(CompletionTarget): class TargetPatternsNotMatched(ApplicationError): """One or more targets were not matched when a match was required.""" - def __init__(self, patterns): - """ - :type patterns: set[str] - """ + def __init__(self, patterns): # type: (t.Set[str]) -> None self.patterns = sorted(patterns) if len(patterns) > 1: @@ -775,3 +719,7 @@ class TargetPatternsNotMatched(ApplicationError): message = 'Target pattern not matched: %s' % self.patterns[0] super().__init__(message) + + +TCompletionTarget = t.TypeVar('TCompletionTarget', bound=CompletionTarget) +TIntegrationTarget = t.TypeVar('TIntegrationTarget', bound=IntegrationTarget) diff --git a/test/lib/ansible_test/_internal/test.py b/test/lib/ansible_test/_internal/test.py index e2a18b655be..af21cbd6022 100644 --- a/test/lib/ansible_test/_internal/test.py +++ b/test/lib/ansible_test/_internal/test.py @@ -16,6 +16,10 @@ from .util_common import ( ResultType, ) +from .metadata import ( + Metadata, +) + from .config import ( TestConfig, ) @@ -23,12 +27,8 @@ from .config import ( from . import junit_xml -def calculate_best_confidence(choices, metadata): - """ - :type choices: tuple[tuple[str, int]] - :type metadata: Metadata - :rtype: int - """ +def calculate_best_confidence(choices, metadata): # type: (t.Tuple[t.Tuple[str, int], ...], Metadata) -> int + """Return the best confidence value available from the given choices and metadata.""" best_confidence = 0 for path, line in choices: @@ -38,13 +38,8 @@ def calculate_best_confidence(choices, metadata): return best_confidence -def calculate_confidence(path, line, metadata): - """ - :type path: str - :type line: int - :type metadata: Metadata - :rtype: int - """ +def calculate_confidence(path, line, metadata): # type: (str, int, Metadata) -> int + """Return the confidence level for a test result associated with the given file path and line number.""" ranges = metadata.changes.get(path) # no changes were made to the file @@ -65,12 +60,7 @@ def calculate_confidence(path, line, metadata): class TestResult: """Base class for test results.""" - def __init__(self, command, test, python_version=None): - """ - :type command: str - :type test: str - :type python_version: str - """ + def __init__(self, command, test, python_version=None): # type: (str, str, t.Optional[str]) -> None self.command = command self.test = test self.python_version = python_version @@ -90,27 +80,20 @@ class TestResult: if args.junit: self.write_junit(args) - def write_console(self): + def write_console(self): # type: () -> None """Write results to console.""" - def write_lint(self): + def write_lint(self): # type: () -> None """Write lint results to stdout.""" - def write_bot(self, args): - """ - :type args: TestConfig - """ + def write_bot(self, args): # type: (TestConfig) -> None + """Write results to a file for ansibullbot to consume.""" - def write_junit(self, args): - """ - :type args: TestConfig - """ + def write_junit(self, args): # type: (TestConfig) -> None + """Write results to a junit XML file.""" - def create_result_name(self, extension): - """ - :type extension: str - :rtype: str - """ + def create_result_name(self, extension): # type: (str) -> str + """Return the name of the result file using the given extension.""" name = 'ansible-test-%s' % self.command if self.test: @@ -145,18 +128,13 @@ class TestResult: class TestTimeout(TestResult): """Test timeout.""" - def __init__(self, timeout_duration): - """ - :type timeout_duration: int - """ + def __init__(self, timeout_duration): # type: (int) -> None super().__init__(command='timeout', test='') self.timeout_duration = timeout_duration - def write(self, args): - """ - :type args: TestConfig - """ + def write(self, args): # type: (TestConfig) -> None + """Write the test results to various locations.""" message = 'Tests were aborted after exceeding the %d minute time limit.' % self.timeout_duration # Include a leading newline to improve readability on Shippable "Tests" tab. @@ -202,10 +180,8 @@ One or more of the following situations may be responsible: class TestSuccess(TestResult): """Test success.""" - def write_junit(self, args): - """ - :type args: TestConfig - """ + def write_junit(self, args): # type: (TestConfig) -> None + """Write results to a junit XML file.""" test_case = junit_xml.TestCase(classname=self.command, name=self.name) self.save_junit(args, test_case) @@ -213,27 +189,20 @@ class TestSuccess(TestResult): class TestSkipped(TestResult): """Test skipped.""" - def __init__(self, command, test, python_version=None): - """ - :type command: str - :type test: str - :type python_version: str - """ + def __init__(self, command, test, python_version=None): # type: (str, str, t.Optional[str]) -> None super().__init__(command, test, python_version) self.reason = None # type: t.Optional[str] - def write_console(self): + def write_console(self): # type: () -> None """Write results to console.""" if self.reason: display.warning(self.reason) else: display.info('No tests applicable.', verbosity=1) - def write_junit(self, args): - """ - :type args: TestConfig - """ + def write_junit(self, args): # type: (TestConfig) -> None + """Write results to a junit XML file.""" test_case = junit_xml.TestCase( classname=self.command, name=self.name, @@ -245,14 +214,14 @@ class TestSkipped(TestResult): class TestFailure(TestResult): """Test failure.""" - def __init__(self, command, test, python_version=None, messages=None, summary=None): - """ - :type command: str - :type test: str - :type python_version: str | None - :type messages: list[TestMessage] | None - :type summary: unicode | None - """ + def __init__( + self, + command, # type: str + test, # type: str + python_version=None, # type: t.Optional[str] + messages=None, # type: t.Optional[t.List[TestMessage]] + summary=None, # type: t.Optional[str] + ): super().__init__(command, test, python_version) if messages: @@ -263,16 +232,14 @@ class TestFailure(TestResult): self.messages = messages self.summary = summary - def write(self, args): - """ - :type args: TestConfig - """ + def write(self, args): # type: (TestConfig) -> None + """Write the test results to various locations.""" if args.metadata.changes: self.populate_confidence(args.metadata) super().write(args) - def write_console(self): + def write_console(self): # type: () -> None """Write results to console.""" if self.summary: display.error(self.summary) @@ -291,7 +258,7 @@ class TestFailure(TestResult): if doc_url: display.info('See documentation for help: %s' % doc_url) - def write_lint(self): + def write_lint(self): # type: () -> None """Write lint results to stdout.""" if self.summary: command = self.format_command() @@ -303,10 +270,8 @@ class TestFailure(TestResult): for message in self.messages: print(message) - def write_junit(self, args): - """ - :type args: TestConfig - """ + def write_junit(self, args): # type: (TestConfig) -> None + """Write results to a junit XML file.""" title = self.format_title() output = self.format_block() @@ -323,10 +288,8 @@ class TestFailure(TestResult): self.save_junit(args, test_case) - def write_bot(self, args): - """ - :type args: TestConfig - """ + def write_bot(self, args): # type: (TestConfig) -> None + """Write results to a file for ansibullbot to consume.""" docs = self.find_docs() message = self.format_title(help_link=docs) output = self.format_block() @@ -352,18 +315,14 @@ class TestFailure(TestResult): write_json_test_results(ResultType.BOT, self.create_result_name('.json'), bot_data) - def populate_confidence(self, metadata): - """ - :type metadata: Metadata - """ + def populate_confidence(self, metadata): # type: (Metadata) -> None + """Populate test result confidence using the provided metadata.""" for message in self.messages: if message.confidence is None: message.confidence = calculate_confidence(message.path, message.line, metadata) - def format_command(self): - """ - :rtype: str - """ + def format_command(self): # type: () -> str + """Return a string representing the CLI command associated with the test failure.""" command = 'ansible-test %s' % self.command if self.test: @@ -397,11 +356,8 @@ class TestFailure(TestResult): return url - def format_title(self, help_link=None): - """ - :type help_link: str | None - :rtype: str - """ + def format_title(self, help_link=None): # type: (t.Optional[str]) -> str + """Return a string containing a title/heading for this test failure, including an optional help link to explain the test.""" command = self.format_command() if self.summary: @@ -418,10 +374,8 @@ class TestFailure(TestResult): return title - def format_block(self): - """ - :rtype: str - """ + def format_block(self): # type: () -> str + """Format the test summary or messages as a block of text and return the result.""" if self.summary: block = self.summary else: @@ -437,16 +391,16 @@ class TestFailure(TestResult): class TestMessage: """Single test message for one file.""" - def __init__(self, message, path, line=0, column=0, level='error', code=None, confidence=None): - """ - :type message: str - :type path: str - :type line: int - :type column: int - :type level: str - :type code: str | None - :type confidence: int | None - """ + def __init__( + self, + message, # type: str + path, # type: str + line=0, # type: int + column=0, # type: int + level='error', # type: str + code=None, # type: t.Optional[str] + confidence=None, # type: t.Optional[int] + ): self.__path = path self.__line = line self.__column = column @@ -515,11 +469,8 @@ class TestMessage: def __str__(self): return self.format() - def format(self, show_confidence=False): - """ - :type show_confidence: bool - :rtype: str - """ + def format(self, show_confidence=False): # type: (bool) -> str + """Return a string representation of this message, optionally including the confidence level.""" if self.__code: msg = '%s: %s' % (self.__code, self.__message) else: diff --git a/test/lib/ansible_test/_internal/util.py b/test/lib/ansible_test/_internal/util.py index 66ece3b2b3e..fdd921e113a 100644 --- a/test/lib/ansible_test/_internal/util.py +++ b/test/lib/ansible_test/_internal/util.py @@ -246,22 +246,20 @@ def get_available_python_versions(): # type: () -> t.Dict[str, str] return dict((version, path) for version, path in ((version, find_python(version, required=False)) for version in SUPPORTED_PYTHON_VERSIONS) if path) -def raw_command(cmd, capture=False, env=None, data=None, cwd=None, explain=False, stdin=None, stdout=None, - cmd_verbosity=1, str_errors='strict', error_callback=None): - """ - :type cmd: collections.Iterable[str] - :type capture: bool - :type env: dict[str, str] | None - :type data: str | None - :type cwd: str | None - :type explain: bool - :type stdin: file | None - :type stdout: file | None - :type cmd_verbosity: int - :type str_errors: str - :type error_callback: t.Callable[[SubprocessError], None] - :rtype: str | None, str | None - """ +def raw_command( + cmd, # type: t.Iterable[str] + capture=False, # type: bool + env=None, # type: t.Optional[t.Dict[str, str]] + data=None, # type: t.Optional[str] + cwd=None, # type: t.Optional[str] + explain=False, # type: bool + stdin=None, # type: t.Optional[t.BinaryIO] + stdout=None, # type: t.Optional[t.BinaryIO] + cmd_verbosity=1, # type: int + str_errors='strict', # type: str + error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]] +): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]] + """Run the specified command and return stdout and stderr as a tuple.""" if not cwd: cwd = os.getcwd() @@ -389,12 +387,8 @@ def common_environment(): return env -def pass_vars(required, optional): - """ - :type required: collections.Iterable[str] - :type optional: collections.Iterable[str] - :rtype: dict[str, str] - """ +def pass_vars(required, optional): # type: (t.Collection[str], t.Collection[str]) -> t.Dict[str, str] + """Return a filtered dictionary of environment variables based on the current environment.""" env = {} for name in required: @@ -572,13 +566,14 @@ class Display: color = self.verbosity_colors.get(verbosity, self.yellow) self.print_message(message, color=color, fd=sys.stderr if self.info_stderr else sys.stdout, truncate=truncate) - def print_message(self, message, color=None, fd=sys.stdout, truncate=False): # pylint: disable=locally-disabled, invalid-name - """ - :type message: str - :type color: str | None - :type fd: t.IO[str] - :type truncate: bool - """ + def print_message( # pylint: disable=locally-disabled, invalid-name + self, + message, # type: str + color=None, # type: t.Optional[str] + fd=sys.stdout, # type: t.TextIO + truncate=False, # type: bool + ): # type: (...) -> None + """Display a message.""" if self.redact and self.sensitive: for item in self.sensitive: if not item: @@ -612,15 +607,15 @@ class ApplicationWarning(Exception): class SubprocessError(ApplicationError): """Error resulting from failed subprocess execution.""" - def __init__(self, cmd, status=0, stdout=None, stderr=None, runtime=None, error_callback=None): - """ - :type cmd: list[str] - :type status: int - :type stdout: str | None - :type stderr: str | None - :type runtime: float | None - :type error_callback: t.Optional[t.Callable[[SubprocessError], None]] - """ + def __init__( + self, + cmd, # type: t.List[str] + status=0, # type: int + stdout=None, # type: t.Optional[str] + stderr=None, # type: t.Optional[str] + runtime=None, # type: t.Optional[float] + error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]] + ): # type: (...) -> None message = 'Command "%s" returned exit status %s.\n' % (' '.join(shlex.quote(c) for c in cmd), status) if stderr: diff --git a/test/lib/ansible_test/_internal/util_common.py b/test/lib/ansible_test/_internal/util_common.py index a616c0eecc6..b4d42420b9e 100644 --- a/test/lib/ansible_test/_internal/util_common.py +++ b/test/lib/ansible_test/_internal/util_common.py @@ -33,6 +33,7 @@ from .util import ( ANSIBLE_TEST_TARGET_ROOT, ANSIBLE_TEST_TOOLS_ROOT, ApplicationError, + SubprocessError, generate_name, ) @@ -402,23 +403,21 @@ def intercept_python( return run_command(args, cmd, capture=capture, env=env, data=data, cwd=cwd, always=always) -def run_command(args, cmd, capture=False, env=None, data=None, cwd=None, always=False, stdin=None, stdout=None, - cmd_verbosity=1, str_errors='strict', error_callback=None): - """ - :type args: CommonConfig - :type cmd: collections.Iterable[str] - :type capture: bool - :type env: dict[str, str] | None - :type data: str | None - :type cwd: str | None - :type always: bool - :type stdin: file | None - :type stdout: file | None - :type cmd_verbosity: int - :type str_errors: str - :type error_callback: t.Callable[[SubprocessError], None] - :rtype: str | None, str | None - """ +def run_command( + args, # type: CommonConfig + cmd, # type: t.Iterable[str] + capture=False, # type: bool + env=None, # type: t.Optional[t.Dict[str, str]] + data=None, # type: t.Optional[str] + cwd=None, # type: t.Optional[str] + always=False, # type: bool + stdin=None, # type: t.Optional[t.BinaryIO] + stdout=None, # type: t.Optional[t.BinaryIO] + cmd_verbosity=1, # type: int + str_errors='strict', # type: str + error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]] +): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]] + """Run the specified command and return stdout and stderr as a tuple.""" explain = args.explain and not always return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout, cmd_verbosity=cmd_verbosity, str_errors=str_errors, error_callback=error_callback) diff --git a/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py index 3d9877e6a3e..75a8b57fff8 100644 --- a/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py +++ b/test/lib/ansible_test/_util/controller/sanity/pylint/plugins/unwanted.py @@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type import os +import typing as t import astroid @@ -16,24 +17,20 @@ ANSIBLE_TEST_MODULE_UTILS_PATH = os.environ['ANSIBLE_TEST_MODULE_UTILS_PATH'] class UnwantedEntry: """Defines an unwanted import.""" - def __init__(self, alternative, modules_only=False, names=None, ignore_paths=None): - """ - :type alternative: str - :type modules_only: bool - :type names: tuple[str] | None - :type ignore_paths: tuple[str] | None - """ + def __init__( + self, + alternative, # type: str + modules_only=False, # type: bool + names=None, # type: t.Optional[t.Tuple[str, ...]] + ignore_paths=None, # type: t.Optional[t.Tuple[str, ...]] + ): # type: (...) -> None self.alternative = alternative self.modules_only = modules_only self.names = set(names) if names else set() self.ignore_paths = ignore_paths - def applies_to(self, path, name=None): - """ - :type path: str - :type name: str | None - :rtype: bool - """ + def applies_to(self, path, name=None): # type: (str, t.Optional[str]) -> bool + """Return True if this entry applies to the given path, otherwise return False.""" if self.names: if not name: return False @@ -50,11 +47,8 @@ class UnwantedEntry: return True -def is_module_path(path): - """ - :type path: str - :rtype: bool - """ +def is_module_path(path): # type: (str) -> bool + """Return True if the given path is a module or module_utils path, otherwise return False.""" return path.startswith(ANSIBLE_TEST_MODULES_PATH) or path.startswith(ANSIBLE_TEST_MODULE_UTILS_PATH) @@ -136,23 +130,17 @@ class AnsibleUnwantedChecker(BaseChecker): modules_only=True), } - def visit_import(self, node): - """ - :type node: astroid.node_classes.Import - """ + def visit_import(self, node): # type: (astroid.node_classes.Import) -> None + """Visit an import node.""" for name in node.names: self._check_import(node, name[0]) - def visit_importfrom(self, node): - """ - :type node: astroid.node_classes.ImportFrom - """ + def visit_importfrom(self, node): # type: (astroid.node_classes.ImportFrom) -> None + """Visit an import from node.""" self._check_importfrom(node, node.modname, node.names) - def visit_attribute(self, node): - """ - :type node: astroid.node_classes.Attribute - """ + def visit_attribute(self, node): # type: (astroid.node_classes.Attribute) -> None + """Visit an attribute node.""" last_child = node.last_child() # this is faster than using type inference and will catch the most common cases @@ -167,10 +155,8 @@ class AnsibleUnwantedChecker(BaseChecker): if entry.applies_to(self.linter.current_file, node.attrname): self.add_message(self.BAD_IMPORT_FROM, args=(node.attrname, entry.alternative, module), node=node) - def visit_call(self, node): - """ - :type node: astroid.node_classes.Call - """ + def visit_call(self, node): # type: (astroid.node_classes.Call) -> None + """Visit a call node.""" try: for i in node.func.inferred(): func = None @@ -188,11 +174,8 @@ class AnsibleUnwantedChecker(BaseChecker): except astroid.exceptions.InferenceError: pass - def _check_import(self, node, modname): - """ - :type node: astroid.node_classes.Import - :type modname: str - """ + def _check_import(self, node, modname): # type: (astroid.node_classes.Import, str) -> None + """Check the imports on the specified import node.""" self._check_module_import(node, modname) entry = self.unwanted_imports.get(modname) @@ -203,12 +186,8 @@ class AnsibleUnwantedChecker(BaseChecker): if entry.applies_to(self.linter.current_file): self.add_message(self.BAD_IMPORT, args=(entry.alternative, modname), node=node) - def _check_importfrom(self, node, modname, names): - """ - :type node: astroid.node_classes.ImportFrom - :type modname: str - :type names: list[str[ - """ + def _check_importfrom(self, node, modname, names): # type: (astroid.node_classes.ImportFrom, str, t.List[str]) -> None + """Check the imports on the specified import from node.""" self._check_module_import(node, modname) entry = self.unwanted_imports.get(modname) @@ -220,11 +199,8 @@ class AnsibleUnwantedChecker(BaseChecker): if entry.applies_to(self.linter.current_file, name[0]): self.add_message(self.BAD_IMPORT_FROM, args=(name[0], entry.alternative, modname), node=node) - def _check_module_import(self, node, modname): - """ - :type node: astroid.node_classes.Import | astroid.node_classes.ImportFrom - :type modname: str - """ + def _check_module_import(self, node, modname): # type: (t.Union[astroid.node_classes.Import, astroid.node_classes.ImportFrom], str) -> None + """Check the module import on the given import or import from node.""" if not is_module_path(self.linter.current_file): return diff --git a/test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py b/test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py index 34d2fde99fc..42822111be8 100644 --- a/test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py +++ b/test/lib/ansible_test/_util/controller/sanity/yamllint/yamllinter.py @@ -7,6 +7,7 @@ import json import os import re import sys +import typing as t import yaml from yaml.resolver import Resolver @@ -79,10 +80,8 @@ class YamlChecker: print(json.dumps(report, indent=4, sort_keys=True)) - def check(self, paths): - """ - :type paths: t.List[str] - """ + def check(self, paths): # type: (t.List[str]) -> None + """Check the specified paths.""" config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config') yaml_conf = YamlLintConfig(file=os.path.join(config_path, 'default.yml')) @@ -107,21 +106,13 @@ class YamlChecker: else: raise Exception('unsupported extension: %s' % extension) - def check_yaml(self, conf, path, contents): - """ - :type conf: YamlLintConfig - :type path: str - :type contents: str - """ + def check_yaml(self, conf, path, contents): # type: (YamlLintConfig, str, str) -> None + """Check the given YAML.""" self.check_parsable(path, contents) self.messages += [self.result_to_message(r, path) for r in linter.run(contents, conf, path)] - def check_module(self, conf, path, contents): - """ - :type conf: YamlLintConfig - :type path: str - :type contents: str - """ + def check_module(self, conf, path, contents): # type: (YamlLintConfig, str, str) -> None + """Check the given module.""" docs = self.get_module_docs(path, contents) for key, value in docs.items(): @@ -142,12 +133,8 @@ class YamlChecker: self.messages += [self.result_to_message(r, path, lineno - 1, key) for r in messages] - def check_parsable(self, path, contents, lineno=1): - """ - :type path: str - :type contents: str - :type lineno: int - """ + def check_parsable(self, path, contents, lineno=1): # type: (str, str, int) -> None + """Check the given contents to verify they can be parsed as YAML.""" try: yaml.load(contents, Loader=TestLoader) except MarkedYAMLError as ex: @@ -160,14 +147,8 @@ class YamlChecker: }] @staticmethod - def result_to_message(result, path, line_offset=0, prefix=''): - """ - :type result: any - :type path: str - :type line_offset: int - :type prefix: str - :rtype: dict[str, any] - """ + def result_to_message(result, path, line_offset=0, prefix=''): # type: (t.Any, str, int, str) -> t.Dict[str, t.Any] + """Convert the given result to a dictionary and return it.""" if prefix: prefix = '%s: ' % prefix @@ -180,12 +161,8 @@ class YamlChecker: level=result.level, ) - def get_module_docs(self, path, contents): - """ - :type path: str - :type contents: str - :rtype: dict[str, any] - """ + def get_module_docs(self, path, contents): # type: (str, str) -> t.Dict[str, t.Any] + """Return the module documentation for the given module contents.""" module_doc_types = [ 'DOCUMENTATION', 'EXAMPLES', @@ -240,12 +217,8 @@ class YamlChecker: return docs - def parse_module(self, path, contents): - """ - :type path: str - :type contents: str - :rtype: ast.Module | None - """ + def parse_module(self, path, contents): # type: (str, str) -> t.Optional[ast.Module] + """Parse the given contents and return a module if successful, otherwise return None.""" try: return ast.parse(contents) except SyntaxError as ex: