ansible-test - Code cleanup.

This helps prepare for a future pylint upgrade.
pull/77949/head
Matt Clay 2 years ago
parent b69eb28ed2
commit 86779cc903

@ -89,10 +89,10 @@ def main(cli_args=None): # type: (t.Optional[t.List[str]]) -> None
display.review_warnings()
config.success = True
except ApplicationWarning as ex:
display.warning(u'%s' % ex)
display.warning('%s' % ex)
sys.exit(0)
except ApplicationError as ex:
display.fatal(u'%s' % ex)
display.fatal('%s' % ex)
sys.exit(1)
except KeyboardInterrupt:
sys.exit(2)

@ -250,7 +250,7 @@ def enumerate_powershell_lines(
try:
coverage_run = read_json_file(path)
except Exception as ex: # pylint: disable=locally-disabled, broad-except
display.error(u'%s' % ex)
display.error('%s' % ex)
return
for filename, hits in coverage_run.items():

@ -263,7 +263,7 @@ def integration_test_environment(
root_temp_dir = os.path.join(ResultType.TMP.path, 'integration')
prefix = '%s-' % target.name
suffix = u'-\u00c5\u00d1\u015a\u00cc\u03b2\u0141\u00c8'
suffix = '-\u00c5\u00d1\u015a\u00cc\u03b2\u0141\u00c8'
if args.no_temp_unicode or 'no/temp_unicode/' in target.aliases:
display.warning('Disabling unicode in the temp work dir is a temporary debugging feature that may be removed in the future without notice.')

@ -100,7 +100,7 @@ class VcenterEnvironment(CloudEnvironment):
parser = configparser.ConfigParser()
parser.read(self.config_path) # static
env_vars = dict()
env_vars = {}
ansible_vars = dict(
resource_prefix=self.resource_prefix,
)

@ -1010,7 +1010,7 @@ class SanityCodeSmellTest(SanitySingleVersion):
return SanityFailure(self.name, messages=messages)
if stderr or status:
summary = u'%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout)
summary = '%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr, stdout=stdout)
return SanityFailure(self.name, summary=summary)
messages = settings.process_errors([], paths)

@ -102,7 +102,7 @@ class AnsibleDocTest(SanitySingleVersion):
status = ex.status
if status:
summary = u'%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr)
summary = '%s' % SubprocessError(cmd=cmd, status=status, stderr=stderr)
return SanityFailure(self.name, summary=summary)
if stdout:
@ -113,7 +113,7 @@ class AnsibleDocTest(SanitySingleVersion):
stderr = re.sub(r'\[WARNING]: [^ ]+ [^ ]+ has been removed\n', '', stderr).strip()
if stderr:
summary = u'Output on stderr from ansible-doc is considered an error.\n\n%s' % SubprocessError(cmd, stderr=stderr)
summary = 'Output on stderr from ansible-doc is considered an error.\n\n%s' % SubprocessError(cmd, stderr=stderr)
return SanityFailure(self.name, summary=summary)
if args.explain:

@ -96,9 +96,9 @@ class PslintTest(SanityVersionNeutral):
cwd = data_context().content.root + '/'
# replace unicode smart quotes and ellipsis with ascii versions
stdout = re.sub(u'[\u2018\u2019]', "'", stdout)
stdout = re.sub(u'[\u201c\u201d]', '"', stdout)
stdout = re.sub(u'[\u2026]', '...', stdout)
stdout = re.sub('[\u2018\u2019]', "'", stdout)
stdout = re.sub('[\u201c\u201d]', '"', stdout)
stdout = re.sub('[\u2026]', '...', stdout)
messages = json.loads(stdout)

@ -221,7 +221,7 @@ class PylintTest(SanitySingleVersion):
if parser.has_section('ansible-test'):
config = dict(parser.items('ansible-test'))
else:
config = dict()
config = {}
disable_plugins = set(i.strip() for i in config.get('disable-plugins', '').split(',') if i)
load_plugins = set(plugin_names + ['pylint.extensions.mccabe']) - disable_plugins

@ -5,15 +5,13 @@ import typing as t
ENCODING = 'utf-8'
Text = type(u'')
def to_optional_bytes(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[bytes]
"""Return the given value as bytes encoded using UTF-8 if not already bytes, or None if the value is None."""
return None if value is None else to_bytes(value, errors)
def to_optional_text(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[t.Text]
def to_optional_text(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[str]
"""Return the given value as text decoded using UTF-8 if not already text, or None if the value is None."""
return None if value is None else to_text(value, errors)
@ -23,18 +21,18 @@ def to_bytes(value, errors='strict'): # type: (t.AnyStr, str) -> bytes
if isinstance(value, bytes):
return value
if isinstance(value, Text):
if isinstance(value, str):
return value.encode(ENCODING, errors)
raise Exception('value is not bytes or text: %s' % type(value))
def to_text(value, errors='strict'): # type: (t.AnyStr, str) -> t.Text
def to_text(value, errors='strict'): # type: (t.AnyStr, str) -> str
"""Return the given value as text decoded using UTF-8 if not already text."""
if isinstance(value, bytes):
return value.decode(ENCODING, errors)
if isinstance(value, Text):
if isinstance(value, str):
return value
raise Exception('value is not bytes or text: %s' % type(value))

@ -92,7 +92,7 @@ class HttpClient:
break
except SubprocessError as ex:
if ex.status in retry_on_status and attempts < max_attempts:
display.warning(u'%s' % ex)
display.warning('%s' % ex)
time.sleep(sleep_seconds)
continue

@ -40,7 +40,7 @@ def find_target_completion(target_func, prefix, short): # type: (t.Callable[[],
matches = list(walk_completion_targets(targets, prefix, short))
return matches
except Exception as ex: # pylint: disable=locally-disabled, broad-except
return [u'%s' % ex]
return ['%s' % ex]
def walk_completion_targets(targets, prefix, short=False): # type: (t.Iterable[CompletionTarget], str, bool) -> t.Tuple[str, ...]

@ -52,7 +52,7 @@ from .constants import (
)
C = t.TypeVar('C')
TType = t.TypeVar('TType')
TBase = t.TypeVar('TBase')
TKey = t.TypeVar('TKey')
TValue = t.TypeVar('TValue')
@ -449,8 +449,8 @@ def raw_command(
data_bytes = to_optional_bytes(data)
stdout_bytes, stderr_bytes = communicate_with_process(process, data_bytes, stdout == subprocess.PIPE, stderr == subprocess.PIPE, capture=capture,
force_stdout=force_stdout)
stdout_text = to_optional_text(stdout_bytes, str_errors) or u''
stderr_text = to_optional_text(stderr_bytes, str_errors) or u''
stdout_text = to_optional_text(stdout_bytes, str_errors) or ''
stderr_text = to_optional_text(stderr_bytes, str_errors) or ''
else:
process.wait()
stdout_text, stderr_text = None, None
@ -1036,19 +1036,19 @@ def sanitize_host_name(name):
return re.sub('[^A-Za-z0-9]+', '-', name)[:63].strip('-')
def get_generic_type(base_type, generic_base_type): # type: (t.Type, t.Type[TType]) -> t.Optional[t.Type[TType]]
def get_generic_type(base_type, generic_base_type): # type: (t.Type, t.Type[TValue]) -> t.Optional[t.Type[TValue]]
"""Return the generic type arg derived from the generic_base_type type that is associated with the base_type type, if any, otherwise return None."""
# noinspection PyUnresolvedReferences
type_arg = t.get_args(base_type.__orig_bases__[0])[0]
return None if isinstance(type_arg, generic_base_type) else type_arg
def get_type_associations(base_type, generic_base_type): # type: (t.Type[TType], t.Type[TValue]) -> t.List[t.Tuple[t.Type[TValue], t.Type[TType]]]
def get_type_associations(base_type, generic_base_type): # type: (t.Type[TBase], t.Type[TValue]) -> t.List[t.Tuple[t.Type[TValue], t.Type[TBase]]]
"""Create and return a list of tuples associating generic_base_type derived types with a corresponding base_type derived type."""
return [item for item in [(get_generic_type(sc_type, generic_base_type), sc_type) for sc_type in get_subclasses(base_type)] if item[1]]
def get_type_map(base_type, generic_base_type): # type: (t.Type[TType], t.Type[TValue]) -> t.Dict[t.Type[TValue], t.Type[TType]]
def get_type_map(base_type, generic_base_type): # type: (t.Type[TBase], t.Type[TValue]) -> t.Dict[t.Type[TValue], t.Type[TBase]]
"""Create and return a mapping of generic_base_type derived types to base_type derived types."""
return {item[0]: item[1] for item in get_type_associations(base_type, generic_base_type)}

@ -1,3 +1,4 @@
"""Check changelog fragment naming, syntax, etc."""
from __future__ import annotations
import os
@ -6,6 +7,7 @@ import subprocess
def main():
"""Main entry point."""
paths = sys.argv[1:] or sys.stdin.read().splitlines()
allowed_extensions = ('.yml', '.yaml')

@ -1,3 +1,4 @@
"""Require empty __init__.py files."""
from __future__ import annotations
import os
@ -5,6 +6,7 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
if os.path.getsize(path) > 0:
print('%s: empty __init__.py required' % path)

@ -1,3 +1,4 @@
"""Enforce proper usage of __future__ imports."""
from __future__ import annotations
import ast
@ -5,6 +6,7 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'rb') as path_fd:
lines = path_fd.read().splitlines()
@ -21,7 +23,7 @@ def main():
break
if missing:
with open(path) as file:
with open(path, encoding='utf-8') as file:
contents = file.read()
# noinspection PyBroadException

@ -1,9 +1,11 @@
"""Require Unix line endings."""
from __future__ import annotations
import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'rb') as path_fd:
contents = path_fd.read()

@ -1,3 +1,4 @@
"""Require __metaclass__ boilerplate for code that supports Python 2.x."""
from __future__ import annotations
import ast
@ -5,6 +6,7 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'rb') as path_fd:
lines = path_fd.read().splitlines()
@ -20,7 +22,7 @@ def main():
break
if missing:
with open(path) as file:
with open(path, encoding='utf-8') as file:
contents = file.read()
# noinspection PyBroadException

@ -1,3 +1,4 @@
"""Disallow use of assert."""
from __future__ import annotations
import re
@ -7,8 +8,9 @@ ASSERT_RE = re.compile(r'^\s*assert[^a-z0-9_:]')
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as file:
with open(path, 'r', encoding='utf-8') as file:
for i, line in enumerate(file.readlines()):
matches = ASSERT_RE.findall(line)

@ -1,3 +1,4 @@
"""Disallow use of basestring isinstance checks."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'(isinstance.*basestring)', text)

@ -1,3 +1,4 @@
"""Disallow use of the dict.iteritems function."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'(?<! six)\.(iteritems)', text)

@ -1,3 +1,4 @@
"""Disallow use of the dict.iterkeys function."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'\.(iterkeys)', text)

@ -1,3 +1,4 @@
"""Disallow use of the dict.itervalues function."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'(?<! six)\.(itervalues)', text)

@ -1,3 +1,4 @@
"""Disallow use of the get_exception function."""
from __future__ import annotations
import re
@ -5,10 +6,11 @@ import sys
def main():
"""Main entry point."""
basic_allow_once = True
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'([^a-zA-Z0-9_]get_exception[^a-zA-Z0-9_])', text)

@ -1,6 +1,8 @@
# a script to check for illegal filenames on various Operating Systems. The
# main rules are derived from restrictions on Windows
# https://msdn.microsoft.com/en-us/library/aa365247#naming_conventions
"""
Check for illegal filenames on various operating systems.
The main rules are derived from restrictions on Windows:
https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#naming-conventions
"""
from __future__ import annotations
import os
@ -53,6 +55,7 @@ ILLEGAL_END_CHARS = [
def check_path(path, is_dir=False):
"""Check the specified path for unwanted characters and names."""
type_name = 'directory' if is_dir else 'file'
file_name = os.path.basename(path.rstrip(os.path.sep))
name = os.path.splitext(file_name)[0]
@ -71,6 +74,7 @@ def check_path(path, is_dir=False):
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
check_path(path, is_dir=path.endswith(os.path.sep))

@ -1,3 +1,4 @@
"""Disallow importing display from __main__."""
from __future__ import annotations
import sys
@ -6,8 +7,9 @@ MAIN_DISPLAY_IMPORT = 'from __main__ import display'
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as file:
with open(path, 'r', encoding='utf-8') as file:
for i, line in enumerate(file.readlines()):
if MAIN_DISPLAY_IMPORT in line:
lineno = i + 1

@ -1,3 +1,4 @@
"""Disallow use of Unicode quotes."""
# -*- coding: utf-8 -*-
from __future__ import annotations
@ -6,6 +7,7 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'rb') as path_fd:
for line, text in enumerate(path_fd.readlines()):
@ -15,7 +17,7 @@ def main():
print('%s:%d:%d: UnicodeDecodeError: %s' % (path, line + 1, ex.start + 1, ex))
continue
match = re.search(u'([‘’“”])', text)
match = re.search('([‘’“”])', text)
if match:
print('%s:%d:%d: use ASCII quotes `\'` and `"` instead of Unicode quotes' % (

@ -1,3 +1,4 @@
"""Disallow use of the unicode_literals future."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'(unicode_literals)', text)

@ -1,3 +1,4 @@
"""Disallow use of the urlopen function."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'^(?:[^#]*?)(urlopen)', text)

@ -123,7 +123,7 @@ def get_collection_version():
def validate_metadata_file(path, is_ansible, check_deprecation_dates=False):
"""Validate explicit runtime metadata file"""
try:
with open(path, 'r') as f_path:
with open(path, 'r', encoding='utf-8') as f_path:
routing = yaml.safe_load(f_path)
except yaml.error.MarkedYAMLError as ex:
print('%s:%d:%d: YAML load failed: %s' % (path, ex.context_mark.line +
@ -251,7 +251,7 @@ def validate_metadata_file(path, is_ansible, check_deprecation_dates=False):
def main():
"""Validate runtime metadata"""
"""Main entry point."""
paths = sys.argv[1:] or sys.stdin.read().splitlines()
collection_legacy_file = 'meta/routing.yml'

@ -1,3 +1,4 @@
"""Check shebangs, execute bits and byte order marks."""
from __future__ import annotations
import os
@ -7,6 +8,7 @@ import sys
def main():
"""Main entry point."""
standard_shebangs = set([
b'#!/bin/bash -eu',
b'#!/bin/bash -eux',

@ -1,3 +1,4 @@
"""Check for unwanted symbolic links."""
from __future__ import annotations
import os
@ -5,6 +6,7 @@ import sys
def main():
"""Main entry point."""
root_dir = os.getcwd() + os.path.sep
for path in sys.argv[1:] or sys.stdin.read().splitlines():

@ -1,3 +1,4 @@
"""Disallow use of the expanduser function."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'(expanduser)', text)

@ -1,3 +1,4 @@
"""Disallow importing of the six module."""
from __future__ import annotations
import re
@ -5,8 +6,9 @@ import sys
def main():
"""Main entry point."""
for path in sys.argv[1:] or sys.stdin.read().splitlines():
with open(path, 'r') as path_fd:
with open(path, 'r', encoding='utf-8') as path_fd:
for line, text in enumerate(path_fd.readlines()):
match = re.search(r'((^\s*import\s+six\b)|(^\s*from\s+six\b))', text)

@ -3,9 +3,7 @@
disable=
cyclic-import, # consistent results require running with --jobs 1 and testing all files
duplicate-code, # consistent results require running with --jobs 1 and testing all files
import-error, # inconsistent results which depend on the availability of imports
import-outside-toplevel, # common pattern in ansible related code
no-name-in-module, # inconsistent results which depend on the availability of imports
no-self-use,
raise-missing-from, # Python 2.x does not support raise from
too-few-public-methods,
@ -18,10 +16,6 @@ disable=
too-many-return-statements,
too-many-statements,
useless-return, # complains about returning None when the return type is optional
# code-smell tests should be updated so the following rules can be enabled
# once that happens the pylint sanity test can be updated to no longer special-case the code-smell tests (use standard ansible-test config instead)
missing-module-docstring,
missing-function-docstring,
[BASIC]
@ -53,3 +47,8 @@ module-rgx=[a-z_][a-z0-9_-]{2,40}$
preferred-modules =
distutils.version:ansible.module_utils.compat.version,
# These modules are used by ansible-test, but will not be present in the virtual environment running pylint.
# Listing them here makes it possible to enable the import-error check.
ignored-modules =
voluptuous,

@ -44,17 +44,17 @@ class TestConstructor(SafeConstructor):
TestConstructor.add_constructor(
u'!unsafe',
'!unsafe',
TestConstructor.construct_yaml_unsafe)
TestConstructor.add_constructor(
u'!vault',
'!vault',
TestConstructor.construct_yaml_str)
TestConstructor.add_constructor(
u'!vault-encrypted',
'!vault-encrypted',
TestConstructor.construct_yaml_str)
@ -90,7 +90,7 @@ class YamlChecker:
for path in paths:
extension = os.path.splitext(path)[1]
with open(path) as file:
with open(path, encoding='utf-8') as file:
contents = file.read()
if extension in ('.yml', '.yaml'):

@ -40,10 +40,10 @@ def read_manifest_json(collection_path):
return None
try:
with open(manifest_path) as manifest_file:
with open(manifest_path, encoding='utf-8') as manifest_file:
manifest = json.load(manifest_file)
collection_info = manifest.get('collection_info') or dict()
collection_info = manifest.get('collection_info') or {}
result = dict(
version=collection_info.get('version'),
@ -63,7 +63,7 @@ def read_galaxy_yml(collection_path):
return None
try:
with open(galaxy_path) as galaxy_file:
with open(galaxy_path, encoding='utf-8') as galaxy_file:
galaxy = yaml.safe_load(galaxy_file)
result = dict(
@ -81,7 +81,7 @@ def main():
collection_path = sys.argv[1]
try:
result = read_manifest_json(collection_path) or read_galaxy_yml(collection_path) or dict()
result = read_manifest_json(collection_path) or read_galaxy_yml(collection_path) or {}
except Exception as ex: # pylint: disable=broad-except
result = dict(
error='{0}'.format(ex),

Loading…
Cancel
Save