ansible-test - More PEP 484 type hints.

Matt Clay 3 years ago
parent 263fd64759
commit 0d5a9f2138

@ -84,6 +84,7 @@ from ...test import (
@ -125,10 +126,8 @@ TARGET_SANITY_ROOT = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'sanity')
created_venvs = [] # type: t.List[str]
def command_sanity(args):
:type args: SanityConfig
def command_sanity(args): # type: (SanityConfig) -> None
"""Run sanity tests."""
target_configs = t.cast(t.List[PosixConfig], args.targets)
@ -606,33 +605,25 @@ class SanityIgnoreProcessor:
class SanitySuccess(TestSuccess):
"""Sanity test success."""
def __init__(self, test, python_version=None):
:type test: str
:type python_version: str
def __init__(self, test, python_version=None): # type: (str, t.Optional[str]) -> None
super().__init__(COMMAND, test, python_version)
class SanitySkipped(TestSkipped):
"""Sanity test skipped."""
def __init__(self, test, python_version=None):
:type test: str
:type python_version: str
def __init__(self, test, python_version=None): # type: (str, t.Optional[str]) -> None
super().__init__(COMMAND, test, python_version)
class SanityFailure(TestFailure):
"""Sanity test failure."""
def __init__(self, test, python_version=None, messages=None, summary=None):
:type test: str
:type python_version: str
:type messages: list[SanityMessage]
:type summary: unicode
def __init__(
test, # type: str
python_version=None, # type: t.Optional[str]
messages=None, # type: t.Optional[t.List[SanityMessage]]
summary=None, # type: t.Optional[str]
): # type: (...) -> None
super().__init__(COMMAND, test, python_version, messages, summary)
@ -809,13 +800,8 @@ class SanitySingleVersion(SanityTest, metaclass=abc.ABCMeta):
return False
def test(self, args, targets, python):
:type args: SanityConfig
:type targets: SanityTargets
:type python: PythonConfig
:rtype: TestResult
def test(self, args, targets, python): # type: (SanityConfig, SanityTargets, PythonConfig) -> TestResult
"""Run the sanity test and return the result."""
def load_processor(self, args): # type: (SanityConfig) -> SanityIgnoreProcessor
"""Load the ignore processor for this sanity test."""
@ -947,13 +933,8 @@ class SanityCodeSmellTest(SanitySingleVersion):
return targets
def test(self, args, targets, python):
:type args: SanityConfig
:type targets: SanityTargets
:type python: PythonConfig
:rtype: TestResult
def test(self, args, targets, python): # type: (SanityConfig, SanityTargets, PythonConfig) -> TestResult
"""Run the sanity test and return the result."""
cmd = [python.path, self.path]
env = ansible_environment(args, color=False)
@ -1027,12 +1008,8 @@ class SanityCodeSmellTest(SanitySingleVersion):
class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta):
"""Base class for sanity test plugins which are idependent of the python version being used."""
def test(self, args, targets):
:type args: SanityConfig
:type targets: SanityTargets
:rtype: TestResult
def test(self, args, targets): # type: (SanityConfig, SanityTargets) -> TestResult
"""Run the sanity test and return the result."""
def load_processor(self, args): # type: (SanityConfig) -> SanityIgnoreProcessor
"""Load the ignore processor for this sanity test."""
@ -1047,13 +1024,8 @@ class SanityVersionNeutral(SanityTest, metaclass=abc.ABCMeta):
class SanityMultipleVersion(SanityTest, metaclass=abc.ABCMeta):
"""Base class for sanity test plugins which should run on multiple python versions."""
def test(self, args, targets, python):
:type args: SanityConfig
:type targets: SanityTargets
:type python: PythonConfig
:rtype: TestResult
def test(self, args, targets, python): # type: (SanityConfig, SanityTargets, PythonConfig) -> TestResult
"""Run the sanity test and return the result."""
def load_processor(self, args, python_version): # type: (SanityConfig, str) -> SanityIgnoreProcessor
"""Load the ignore processor for this sanity test."""

@ -29,6 +29,7 @@ from import (
from import (
@ -169,12 +170,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
return self._ci_test_groups
def format_test_group_alias(self, name, fallback=''):
:type name: str
:type fallback: str
:rtype: str
def format_test_group_alias(self, name, fallback=''): # type: (str, str) -> str
"""Return a test group alias using the given name and fallback."""
group_numbers = self.ci_test_groups.get(name, None)
if group_numbers:
@ -232,11 +229,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
return SanitySuccess(
def check_posix_targets(self, args):
:type args: SanityConfig
:rtype: list[SanityMessage]
def check_posix_targets(self, args): # type: (SanityConfig) -> t.List[SanityMessage]
"""Check POSIX integration test targets and return messages with any issues found."""
posix_targets = tuple(walk_posix_integration_targets())
clouds = get_cloud_platforms(args, posix_targets)
@ -298,13 +292,13 @@ class IntegrationAliasesTest(SanitySingleVersion):
return messages
def check_ci_group(self, targets, find, find_incidental=None):
:type targets: tuple[CompletionTarget]
:type find: str
:type find_incidental: list[str] | None
:rtype: list[SanityMessage]
def check_ci_group(
targets, # type: t.Tuple[CompletionTarget, ...]
find, # type: str
find_incidental=None, # type: t.Optional[t.List[str]]
): # type: (...) -> t.List[SanityMessage]
"""Check the CI groups set in the provided targets and return a list of messages with any issues found."""
all_paths = set(target.path for target in targets)
supported_paths = set(target.path for target in filter_targets(targets, [find], directories=False, errors=False))
unsupported_paths = set(target.path for target in filter_targets(targets, [self.UNSUPPORTED], directories=False, errors=False))
@ -330,11 +324,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
return messages
def check_changes(self, args, results):
:type args: SanityConfig
:type results: dict[str, any]
def check_changes(self, args, results): # type: (SanityConfig, t.Dict[str, t.Any]) -> None
"""Check changes and store results in the provided results dictionary."""
integration_targets = list(walk_integration_targets())
module_targets = list(walk_module_targets())
@ -381,12 +372,8 @@ class IntegrationAliasesTest(SanitySingleVersion):
results['comments'] += comments
def format_comment(self, template, targets):
:type template: str
:type targets: list[str]
:rtype: str | None
def format_comment(self, template, targets): # type: (str, t.List[str]) -> t.Optional[str]
"""Format and return a comment based on the given template and targets, or None if there are no targets."""
if not targets:
return None

@ -2,7 +2,6 @@
from __future__ import annotations
import abc
import functools
import shlex
import sys
import tempfile
@ -160,6 +159,10 @@ class SshConnection(Connection):
options.append(' '.join(shlex.quote(cmd) for cmd in command))
def error_callback(ex): # type: (SubprocessError) -> None
"""Error handler."""
self.capture_log_details(, ex)
return run_command(
cmd=['ssh'] + options,
@ -167,7 +170,7 @@ class SshConnection(Connection):

@ -291,15 +291,14 @@ def generate_command(
return cmd
def filter_options(args, argv, options, exclude, require):
:type args: EnvironmentConfig
:type argv: list[str]
:type options: dict[str, int]
:type exclude: list[str]
:type require: list[str]
:rtype: collections.Iterable[str]
def filter_options(
args, # type: EnvironmentConfig
argv, # type: t.List[str]
options, # type: t.Dict[str, int]
exclude, # type: t.List[str]
require, # type: t.List[str]
): # type: (...) -> t.Iterable[str]
"""Return an iterable that filters out unwanted CLI options and injects new ones as requested."""
options = options.copy()
options['--truncate'] = 1

@ -11,13 +11,6 @@ from ..util import (
# noinspection PyTypeChecker
TPathProvider = t.TypeVar('TPathProvider', bound='PathProvider')
except AttributeError:
TPathProvider = None # pylint: disable=invalid-name
def get_path_provider_classes(provider_type): # type: (t.Type[TPathProvider]) -> t.List[t.Type[TPathProvider]]
"""Return a list of path provider classes of the given type."""
return sorted(get_subclasses(provider_type), key=lambda c: (c.priority, c.__name__))
@ -74,3 +67,6 @@ class PathProvider(metaclass=abc.ABCMeta):
def is_content_root(path): # type: (str) -> bool
"""Return True if the given path is a content root for this provider."""
TPathProvider = t.TypeVar('TPathProvider', bound=PathProvider)

@ -32,26 +32,9 @@ from .data import (
MODULE_EXTENSIONS = '.py', '.ps1'
# noinspection PyTypeChecker
TCompletionTarget = t.TypeVar('TCompletionTarget', bound='CompletionTarget')
except AttributeError:
TCompletionTarget = None # pylint: disable=invalid-name
# noinspection PyTypeChecker
TIntegrationTarget = t.TypeVar('TIntegrationTarget', bound='IntegrationTarget')
except AttributeError:
TIntegrationTarget = None # pylint: disable=invalid-name
def find_target_completion(target_func, prefix, short):
:type target_func: () -> collections.Iterable[CompletionTarget]
:type prefix: unicode
:type short: bool
:rtype: list[str]
def find_target_completion(target_func, prefix, short): # type: (t.Callable[[], t.Iterable[CompletionTarget]], str, bool) -> t.List[str]
"""Return a list of targets from the given target function which match the given prefix."""
targets = target_func()
matches = list(walk_completion_targets(targets, prefix, short))
@ -60,13 +43,8 @@ def find_target_completion(target_func, prefix, short):
return [u'%s' % ex]
def walk_completion_targets(targets, prefix, short=False):
:type targets: collections.Iterable[CompletionTarget]
:type prefix: str
:type short: bool
:rtype: tuple[str]
def walk_completion_targets(targets, prefix, short=False): # type: (t.Iterable[CompletionTarget], str, bool) -> t.Tuple[str, ...]
"""Return a tuple of targets from the given target iterable which match the given prefix."""
aliases = set(alias for target in targets for alias in target.aliases)
if prefix.endswith('/') and prefix in aliases:
@ -85,14 +63,13 @@ def walk_completion_targets(targets, prefix, short=False):
return tuple(sorted(matches))
def walk_internal_targets(targets, includes=None, excludes=None, requires=None):
:type targets: collections.Iterable[T <= CompletionTarget]
:type includes: list[str]
:type excludes: list[str]
:type requires: list[str]
:rtype: tuple[T <= CompletionTarget]
def walk_internal_targets(
targets, # type: t.Iterable[TCompletionTarget]
includes=None, # type: t.Optional[t.List[str]]
excludes=None, # type: t.Optional[t.List[str]]
requires=None, # type: t.Optional[t.List[str]]
): # type: (...) -> t.Tuple[TCompletionTarget, ...]
"""Return a tuple of matching completion targets."""
targets = tuple(targets)
include_targets = sorted(filter_targets(targets, includes, directories=False), key=lambda include_target:
@ -173,69 +150,49 @@ def walk_module_targets():
yield target
def walk_units_targets():
:rtype: collections.Iterable[TestTarget]
def walk_units_targets(): # type: () -> t.Iterable[TestTarget]
"""Return an iterable of units targets."""
return walk_test_targets(path=data_context().content.unit_path, module_path=data_context().content.unit_module_path, extensions=('.py',), prefix='test_')
def walk_compile_targets(include_symlinks=True):
:type include_symlinks: bool
:rtype: collections.Iterable[TestTarget]
def walk_compile_targets(include_symlinks=True): # type: (bool) -> t.Iterable[TestTarget, ...]
"""Return an iterable of compile targets."""
return walk_test_targets(module_path=data_context().content.module_path, extensions=('.py',), extra_dirs=('bin',), include_symlinks=include_symlinks)
def walk_powershell_targets(include_symlinks=True):
:rtype: collections.Iterable[TestTarget]
def walk_powershell_targets(include_symlinks=True): # type: (bool) -> t.Iterable[TestTarget]
"""Return an iterable of PowerShell targets."""
return walk_test_targets(module_path=data_context().content.module_path, extensions=('.ps1', '.psm1'), include_symlinks=include_symlinks)
def walk_sanity_targets():
:rtype: collections.Iterable[TestTarget]
def walk_sanity_targets(): # type: () -> t.Iterable[TestTarget]
"""Return an iterable of sanity targets."""
return walk_test_targets(module_path=data_context().content.module_path, include_symlinks=True, include_symlinked_directories=True)
def walk_posix_integration_targets(include_hidden=False):
:type include_hidden: bool
:rtype: collections.Iterable[IntegrationTarget]
def walk_posix_integration_targets(include_hidden=False): # type: (bool) -> t.Iterable[IntegrationTarget]
"""Return an iterable of POSIX integration targets."""
for target in walk_integration_targets():
if 'posix/' in target.aliases or (include_hidden and 'hidden/posix/' in target.aliases):
yield target
def walk_network_integration_targets(include_hidden=False):
:type include_hidden: bool
:rtype: collections.Iterable[IntegrationTarget]
def walk_network_integration_targets(include_hidden=False): # type: (bool) -> t.Iterable[IntegrationTarget]
"""Return an iterable of network integration targets."""
for target in walk_integration_targets():
if 'network/' in target.aliases or (include_hidden and 'hidden/network/' in target.aliases):
yield target
def walk_windows_integration_targets(include_hidden=False):
:type include_hidden: bool
:rtype: collections.Iterable[IntegrationTarget]
def walk_windows_integration_targets(include_hidden=False): # type: (bool) -> t.Iterable[IntegrationTarget]
"""Return an iterable of windows integration targets."""
for target in walk_integration_targets():
if 'windows/' in target.aliases or (include_hidden and 'hidden/windows/' in target.aliases):
yield target
def walk_integration_targets():
:rtype: collections.Iterable[IntegrationTarget]
def walk_integration_targets(): # type: () -> t.Iterable[IntegrationTarget]
"""Return an iterable of integration targets."""
path = data_context().content.integration_targets_path
modules = frozenset(target.module for target in walk_module_targets())
paths = data_context().content.walk_files(path)
@ -305,17 +262,16 @@ def load_integration_prefixes():
return prefixes
def walk_test_targets(path=None, module_path=None, extensions=None, prefix=None, extra_dirs=None, include_symlinks=False, include_symlinked_directories=False):
:type path: str | None
:type module_path: str | None
:type extensions: tuple[str] | None
:type prefix: str | None
:type extra_dirs: tuple[str] | None
:type include_symlinks: bool
:type include_symlinked_directories: bool
:rtype: collections.Iterable[TestTarget]
def walk_test_targets(
path=None, # type: t.Optional[str]
module_path=None, # type: t.Optional[str]
extensions=None, # type: t.Optional[t.Tuple[str, ...]]
prefix=None, # type: t.Optional[str]
extra_dirs=None, # type: t.Optional[t.Tuple[str, ...]]
include_symlinks=False, # type: bool
include_symlinked_directories=False, # type: bool
): # type: (...) -> t.Iterable[TestTarget]
"""Iterate over available test targets."""
if path:
file_paths = data_context().content.walk_files(path, include_symlinked_directories=include_symlinked_directories)
@ -486,11 +442,7 @@ class CompletionTarget(metaclass=abc.ABCMeta):
class DirectoryTarget(CompletionTarget):
"""Directory target."""
def __init__(self, path, modules):
:type path: str
:type modules: tuple[str]
def __init__(self, path, modules): # type: (str, t.Tuple[str, ...]) -> None
super().__init__() = path
@ -500,14 +452,14 @@ class DirectoryTarget(CompletionTarget):
class TestTarget(CompletionTarget):
"""Generic test target."""
def __init__(self, path, module_path, module_prefix, base_path, symlink=None):
:type path: str
:type module_path: str | None
:type module_prefix: str | None
:type base_path: str
:type symlink: bool | None
def __init__(
path, # type: str
module_path, # type: t.Optional[str]
module_prefix, # type: t.Optional[str]
base_path, # type: str
symlink=None, # type: t.Optional[bool]
if symlink is None:
@ -614,12 +566,7 @@ class IntegrationTarget(CompletionTarget):
def __init__(self, path, modules, prefixes):
:type path: str
:type modules: frozenset[str]
:type prefixes: dict[str, str]
def __init__(self, path, modules, prefixes): # type: (str, t.FrozenSet[str], t.Dict[str, str]) -> None
self.relative_path = os.path.relpath(path, data_context().content.integration_targets_path)
@ -763,10 +710,7 @@ class IntegrationTarget(CompletionTarget):
class TargetPatternsNotMatched(ApplicationError):
"""One or more targets were not matched when a match was required."""
def __init__(self, patterns):
:type patterns: set[str]
def __init__(self, patterns): # type: (t.Set[str]) -> None
self.patterns = sorted(patterns)
if len(patterns) > 1:
@ -775,3 +719,7 @@ class TargetPatternsNotMatched(ApplicationError):
message = 'Target pattern not matched: %s' % self.patterns[0]
TCompletionTarget = t.TypeVar('TCompletionTarget', bound=CompletionTarget)
TIntegrationTarget = t.TypeVar('TIntegrationTarget', bound=IntegrationTarget)

@ -16,6 +16,10 @@ from .util_common import (
from .metadata import (
from .config import (
@ -23,12 +27,8 @@ from .config import (
from . import junit_xml
def calculate_best_confidence(choices, metadata):
:type choices: tuple[tuple[str, int]]
:type metadata: Metadata
:rtype: int
def calculate_best_confidence(choices, metadata): # type: (t.Tuple[t.Tuple[str, int], ...], Metadata) -> int
"""Return the best confidence value available from the given choices and metadata."""
best_confidence = 0
for path, line in choices:
@ -38,13 +38,8 @@ def calculate_best_confidence(choices, metadata):
return best_confidence
def calculate_confidence(path, line, metadata):
:type path: str
:type line: int
:type metadata: Metadata
:rtype: int
def calculate_confidence(path, line, metadata): # type: (str, int, Metadata) -> int
"""Return the confidence level for a test result associated with the given file path and line number."""
ranges = metadata.changes.get(path)
# no changes were made to the file
@ -65,12 +60,7 @@ def calculate_confidence(path, line, metadata):
class TestResult:
"""Base class for test results."""
def __init__(self, command, test, python_version=None):
:type command: str
:type test: str
:type python_version: str
def __init__(self, command, test, python_version=None): # type: (str, str, t.Optional[str]) -> None
self.command = command
self.test = test
self.python_version = python_version
@ -90,27 +80,20 @@ class TestResult:
if args.junit:
def write_console(self):
def write_console(self): # type: () -> None
"""Write results to console."""
def write_lint(self):
def write_lint(self): # type: () -> None
"""Write lint results to stdout."""
def write_bot(self, args):
:type args: TestConfig
def write_bot(self, args): # type: (TestConfig) -> None
"""Write results to a file for ansibullbot to consume."""
def write_junit(self, args):
:type args: TestConfig
def write_junit(self, args): # type: (TestConfig) -> None
"""Write results to a junit XML file."""
def create_result_name(self, extension):
:type extension: str
:rtype: str
def create_result_name(self, extension): # type: (str) -> str
"""Return the name of the result file using the given extension."""
name = 'ansible-test-%s' % self.command
if self.test:
@ -145,18 +128,13 @@ class TestResult:
class TestTimeout(TestResult):
"""Test timeout."""
def __init__(self, timeout_duration):
:type timeout_duration: int
def __init__(self, timeout_duration): # type: (int) -> None
super().__init__(command='timeout', test='')
self.timeout_duration = timeout_duration
def write(self, args):
:type args: TestConfig
def write(self, args): # type: (TestConfig) -> None
"""Write the test results to various locations."""
message = 'Tests were aborted after exceeding the %d minute time limit.' % self.timeout_duration
# Include a leading newline to improve readability on Shippable "Tests" tab.
@ -202,10 +180,8 @@ One or more of the following situations may be responsible:
class TestSuccess(TestResult):
"""Test success."""
def write_junit(self, args):
:type args: TestConfig
def write_junit(self, args): # type: (TestConfig) -> None
"""Write results to a junit XML file."""
test_case = junit_xml.TestCase(classname=self.command,
self.save_junit(args, test_case)
@ -213,27 +189,20 @@ class TestSuccess(TestResult):
class TestSkipped(TestResult):
"""Test skipped."""
def __init__(self, command, test, python_version=None):
:type command: str
:type test: str
:type python_version: str
def __init__(self, command, test, python_version=None): # type: (str, str, t.Optional[str]) -> None
super().__init__(command, test, python_version)
self.reason = None # type: t.Optional[str]
def write_console(self):
def write_console(self): # type: () -> None
"""Write results to console."""
if self.reason:
else:'No tests applicable.', verbosity=1)
def write_junit(self, args):
:type args: TestConfig
def write_junit(self, args): # type: (TestConfig) -> None
"""Write results to a junit XML file."""
test_case = junit_xml.TestCase(
@ -245,14 +214,14 @@ class TestSkipped(TestResult):
class TestFailure(TestResult):
"""Test failure."""
def __init__(self, command, test, python_version=None, messages=None, summary=None):
:type command: str
:type test: str
:type python_version: str | None
:type messages: list[TestMessage] | None
:type summary: unicode | None
def __init__(
command, # type: str
test, # type: str
python_version=None, # type: t.Optional[str]
messages=None, # type: t.Optional[t.List[TestMessage]]
summary=None, # type: t.Optional[str]
super().__init__(command, test, python_version)
if messages:
@ -263,16 +232,14 @@ class TestFailure(TestResult):
self.messages = messages
self.summary = summary
def write(self, args):
:type args: TestConfig
def write(self, args): # type: (TestConfig) -> None
"""Write the test results to various locations."""
if args.metadata.changes:
def write_console(self):
def write_console(self): # type: () -> None
"""Write results to console."""
if self.summary:
@ -291,7 +258,7 @@ class TestFailure(TestResult):
if doc_url:'See documentation for help: %s' % doc_url)
def write_lint(self):
def write_lint(self): # type: () -> None
"""Write lint results to stdout."""
if self.summary:
command = self.format_command()
@ -303,10 +270,8 @@ class TestFailure(TestResult):
for message in self.messages:
def write_junit(self, args):
:type args: TestConfig
def write_junit(self, args): # type: (TestConfig) -> None
"""Write results to a junit XML file."""
title = self.format_title()
output = self.format_block()
@ -323,10 +288,8 @@ class TestFailure(TestResult):
self.save_junit(args, test_case)
def write_bot(self, args):
:type args: TestConfig
def write_bot(self, args): # type: (TestConfig) -> None
"""Write results to a file for ansibullbot to consume."""
docs = self.find_docs()
message = self.format_title(help_link=docs)
output = self.format_block()
@ -352,18 +315,14 @@ class TestFailure(TestResult):
write_json_test_results(ResultType.BOT, self.create_result_name('.json'), bot_data)
def populate_confidence(self, metadata):
:type metadata: Metadata
def populate_confidence(self, metadata): # type: (Metadata) -> None
"""Populate test result confidence using the provided metadata."""
for message in self.messages:
if message.confidence is None:
message.confidence = calculate_confidence(message.path, message.line, metadata)
def format_command(self):
:rtype: str
def format_command(self): # type: () -> str
"""Return a string representing the CLI command associated with the test failure."""
command = 'ansible-test %s' % self.command
if self.test:
@ -397,11 +356,8 @@ class TestFailure(TestResult):
return url
def format_title(self, help_link=None):
:type help_link: str | None
:rtype: str
def format_title(self, help_link=None): # type: (t.Optional[str]) -> str
"""Return a string containing a title/heading for this test failure, including an optional help link to explain the test."""
command = self.format_command()
if self.summary:
@ -418,10 +374,8 @@ class TestFailure(TestResult):
return title
def format_block(self):
:rtype: str
def format_block(self): # type: () -> str
"""Format the test summary or messages as a block of text and return the result."""
if self.summary:
block = self.summary
@ -437,16 +391,16 @@ class TestFailure(TestResult):
class TestMessage:
"""Single test message for one file."""
def __init__(self, message, path, line=0, column=0, level='error', code=None, confidence=None):
:type message: str
:type path: str
:type line: int
:type column: int
:type level: str
:type code: str | None
:type confidence: int | None
def __init__(
message, # type: str
path, # type: str
line=0, # type: int
column=0, # type: int
level='error', # type: str
code=None, # type: t.Optional[str]
confidence=None, # type: t.Optional[int]
self.__path = path
self.__line = line
self.__column = column
@ -515,11 +469,8 @@ class TestMessage:
def __str__(self):
return self.format()
def format(self, show_confidence=False):
:type show_confidence: bool
:rtype: str
def format(self, show_confidence=False): # type: (bool) -> str
"""Return a string representation of this message, optionally including the confidence level."""
if self.__code:
msg = '%s: %s' % (self.__code, self.__message)

@ -246,22 +246,20 @@ def get_available_python_versions(): # type: () -> t.Dict[str, str]
return dict((version, path) for version, path in ((version, find_python(version, required=False)) for version in SUPPORTED_PYTHON_VERSIONS) if path)
def raw_command(cmd, capture=False, env=None, data=None, cwd=None, explain=False, stdin=None, stdout=None,
cmd_verbosity=1, str_errors='strict', error_callback=None):
:type cmd: collections.Iterable[str]
:type capture: bool
:type env: dict[str, str] | None
:type data: str | None
:type cwd: str | None
:type explain: bool
:type stdin: file | None
:type stdout: file | None
:type cmd_verbosity: int
:type str_errors: str
:type error_callback: t.Callable[[SubprocessError], None]
:rtype: str | None, str | None
def raw_command(
cmd, # type: t.Iterable[str]
capture=False, # type: bool
env=None, # type: t.Optional[t.Dict[str, str]]
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
explain=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Run the specified command and return stdout and stderr as a tuple."""
if not cwd:
cwd = os.getcwd()
@ -389,12 +387,8 @@ def common_environment():
return env
def pass_vars(required, optional):
:type required: collections.Iterable[str]
:type optional: collections.Iterable[str]
:rtype: dict[str, str]
def pass_vars(required, optional): # type: (t.Collection[str], t.Collection[str]) -> t.Dict[str, str]
"""Return a filtered dictionary of environment variables based on the current environment."""
env = {}
for name in required:
@ -572,13 +566,14 @@ class Display:
color = self.verbosity_colors.get(verbosity, self.yellow)
self.print_message(message, color=color, fd=sys.stderr if self.info_stderr else sys.stdout, truncate=truncate)
def print_message(self, message, color=None, fd=sys.stdout, truncate=False): # pylint: disable=locally-disabled, invalid-name
:type message: str
:type color: str | None
:type fd: t.IO[str]
:type truncate: bool
def print_message( # pylint: disable=locally-disabled, invalid-name
message, # type: str
color=None, # type: t.Optional[str]
fd=sys.stdout, # type: t.TextIO
truncate=False, # type: bool
): # type: (...) -> None
"""Display a message."""
if self.redact and self.sensitive:
for item in self.sensitive:
if not item:
@ -612,15 +607,15 @@ class ApplicationWarning(Exception):
class SubprocessError(ApplicationError):
"""Error resulting from failed subprocess execution."""
def __init__(self, cmd, status=0, stdout=None, stderr=None, runtime=None, error_callback=None):
:type cmd: list[str]
:type status: int
:type stdout: str | None
:type stderr: str | None
:type runtime: float | None
:type error_callback: t.Optional[t.Callable[[SubprocessError], None]]
def __init__(
cmd, # type: t.List[str]
status=0, # type: int
stdout=None, # type: t.Optional[str]
stderr=None, # type: t.Optional[str]
runtime=None, # type: t.Optional[float]
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> None
message = 'Command "%s" returned exit status %s.\n' % (' '.join(shlex.quote(c) for c in cmd), status)
if stderr:

@ -33,6 +33,7 @@ from .util import (
@ -402,23 +403,21 @@ def intercept_python(
return run_command(args, cmd, capture=capture, env=env, data=data, cwd=cwd, always=always)
def run_command(args, cmd, capture=False, env=None, data=None, cwd=None, always=False, stdin=None, stdout=None,
cmd_verbosity=1, str_errors='strict', error_callback=None):
:type args: CommonConfig
:type cmd: collections.Iterable[str]
:type capture: bool
:type env: dict[str, str] | None
:type data: str | None
:type cwd: str | None
:type always: bool
:type stdin: file | None
:type stdout: file | None
:type cmd_verbosity: int
:type str_errors: str
:type error_callback: t.Callable[[SubprocessError], None]
:rtype: str | None, str | None
def run_command(
args, # type: CommonConfig
cmd, # type: t.Iterable[str]
capture=False, # type: bool
env=None, # type: t.Optional[t.Dict[str, str]]
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
always=False, # type: bool
stdin=None, # type: t.Optional[t.BinaryIO]
stdout=None, # type: t.Optional[t.BinaryIO]
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Run the specified command and return stdout and stderr as a tuple."""
explain = args.explain and not always
return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout,
cmd_verbosity=cmd_verbosity, str_errors=str_errors, error_callback=error_callback)

@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import typing as t
import astroid
class UnwantedEntry:
"""Defines an unwanted import."""
def __init__(self, alternative, modules_only=False, names=None, ignore_paths=None):
:type alternative: str
:type modules_only: bool
:type names: tuple[str] | None
:type ignore_paths: tuple[str] | None
def __init__(
alternative, # type: str
modules_only=False, # type: bool
names=None, # type: t.Optional[t.Tuple[str, ...]]
ignore_paths=None, # type: t.Optional[t.Tuple[str, ...]]
): # type: (...) -> None
self.alternative = alternative
self.modules_only = modules_only
self.names = set(names) if names else set()
self.ignore_paths = ignore_paths
def applies_to(self, path, name=None):
:type path: str
:type name: str | None
:rtype: bool
def applies_to(self, path, name=None): # type: (str, t.Optional[str]) -> bool
"""Return True if this entry applies to the given path, otherwise return False."""
if self.names:
if not name:
return False
@ -50,11 +47,8 @@ class UnwantedEntry:
return True
def is_module_path(path):
:type path: str
:rtype: bool
def is_module_path(path): # type: (str) -> bool
"""Return True if the given path is a module or module_utils path, otherwise return False."""
return path.startswith(ANSIBLE_TEST_MODULES_PATH) or path.startswith(ANSIBLE_TEST_MODULE_UTILS_PATH)
@ -136,23 +130,17 @@ class AnsibleUnwantedChecker(BaseChecker):
def visit_import(self, node):
:type node: astroid.node_classes.Import
def visit_import(self, node): # type: (astroid.node_classes.Import) -> None
"""Visit an import node."""
for name in node.names:
self._check_import(node, name[0])
def visit_importfrom(self, node):
:type node: astroid.node_classes.ImportFrom
def visit_importfrom(self, node): # type: (astroid.node_classes.ImportFrom) -> None
"""Visit an import from node."""
self._check_importfrom(node, node.modname, node.names)
def visit_attribute(self, node):
:type node: astroid.node_classes.Attribute
def visit_attribute(self, node): # type: (astroid.node_classes.Attribute) -> None
"""Visit an attribute node."""
last_child = node.last_child()
# this is faster than using type inference and will catch the most common cases
@ -167,10 +155,8 @@ class AnsibleUnwantedChecker(BaseChecker):
if entry.applies_to(self.linter.current_file, node.attrname):
self.add_message(self.BAD_IMPORT_FROM, args=(node.attrname, entry.alternative, module), node=node)
def visit_call(self, node):
:type node: astroid.node_classes.Call
def visit_call(self, node): # type: (astroid.node_classes.Call) -> None
"""Visit a call node."""
for i in node.func.inferred():
func = None
@ -188,11 +174,8 @@ class AnsibleUnwantedChecker(BaseChecker):
except astroid.exceptions.InferenceError:
def _check_import(self, node, modname):
:type node: astroid.node_classes.Import
:type modname: str
def _check_import(self, node, modname): # type: (astroid.node_classes.Import, str) -> None
"""Check the imports on the specified import node."""
self._check_module_import(node, modname)
entry = self.unwanted_imports.get(modname)
@ -203,12 +186,8 @@ class AnsibleUnwantedChecker(BaseChecker):
if entry.applies_to(self.linter.current_file):
self.add_message(self.BAD_IMPORT, args=(entry.alternative, modname), node=node)
def _check_importfrom(self, node, modname, names):
:type node: astroid.node_classes.ImportFrom
:type modname: str
:type names: list[str[
def _check_importfrom(self, node, modname, names): # type: (astroid.node_classes.ImportFrom, str, t.List[str]) -> None
"""Check the imports on the specified import from node."""
self._check_module_import(node, modname)
entry = self.unwanted_imports.get(modname)
@ -220,11 +199,8 @@ class AnsibleUnwantedChecker(BaseChecker):
if entry.applies_to(self.linter.current_file, name[0]):
self.add_message(self.BAD_IMPORT_FROM, args=(name[0], entry.alternative, modname), node=node)
def _check_module_import(self, node, modname):
:type node: astroid.node_classes.Import | astroid.node_classes.ImportFrom
:type modname: str
def _check_module_import(self, node, modname): # type: (t.Union[astroid.node_classes.Import, astroid.node_classes.ImportFrom], str) -> None
"""Check the module import on the given import or import from node."""
if not is_module_path(self.linter.current_file):

@ -7,6 +7,7 @@ import json
import os
import re
import sys
import typing as t
import yaml
from yaml.resolver import Resolver
@ -79,10 +80,8 @@ class YamlChecker:
print(json.dumps(report, indent=4, sort_keys=True))
def check(self, paths):
:type paths: t.List[str]
def check(self, paths): # type: (t.List[str]) -> None
"""Check the specified paths."""
config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config')
yaml_conf = YamlLintConfig(file=os.path.join(config_path, 'default.yml'))
@ -107,21 +106,13 @@ class YamlChecker:
raise Exception('unsupported extension: %s' % extension)
def check_yaml(self, conf, path, contents):
:type conf: YamlLintConfig
:type path: str
:type contents: str
def check_yaml(self, conf, path, contents): # type: (YamlLintConfig, str, str) -> None
"""Check the given YAML."""
self.check_parsable(path, contents)
self.messages += [self.result_to_message(r, path) for r in, conf, path)]
def check_module(self, conf, path, contents):
:type conf: YamlLintConfig
:type path: str
:type contents: str
def check_module(self, conf, path, contents): # type: (YamlLintConfig, str, str) -> None
"""Check the given module."""
docs = self.get_module_docs(path, contents)
for key, value in docs.items():
@ -142,12 +133,8 @@ class YamlChecker:
self.messages += [self.result_to_message(r, path, lineno - 1, key) for r in messages]
def check_parsable(self, path, contents, lineno=1):
:type path: str
:type contents: str
:type lineno: int
def check_parsable(self, path, contents, lineno=1): # type: (str, str, int) -> None
"""Check the given contents to verify they can be parsed as YAML."""
yaml.load(contents, Loader=TestLoader)
except MarkedYAMLError as ex:
@ -160,14 +147,8 @@ class YamlChecker:
def result_to_message(result, path, line_offset=0, prefix=''):
:type result: any
:type path: str
:type line_offset: int
:type prefix: str
:rtype: dict[str, any]
def result_to_message(result, path, line_offset=0, prefix=''): # type: (t.Any, str, int, str) -> t.Dict[str, t.Any]
"""Convert the given result to a dictionary and return it."""
if prefix:
prefix = '%s: ' % prefix
@ -180,12 +161,8 @@ class YamlChecker:
def get_module_docs(self, path, contents):
:type path: str
:type contents: str
:rtype: dict[str, any]
def get_module_docs(self, path, contents): # type: (str, str) -> t.Dict[str, t.Any]
"""Return the module documentation for the given module contents."""
module_doc_types = [
@ -240,12 +217,8 @@ class YamlChecker:
return docs
def parse_module(self, path, contents):
:type path: str
:type contents: str
:rtype: ast.Module | None
def parse_module(self, path, contents): # type: (str, str) -> t.Optional[ast.Module]
"""Parse the given contents and return a module if successful, otherwise return None."""
return ast.parse(contents)
except SyntaxError as ex:
