ansible-test - Support multiple coverage versions.

ci_complete
ci_coverage
pull/77585/head
Matt Clay 3 years ago
parent 4d69c09695
commit b960641759

@ -0,0 +1,4 @@
minor_changes:
- ansible-test - Support multiple pinned versions of the ``coverage`` module.
The version used now depends on the Python version in use.
- ansible-test - Enable loading of ``coverage`` data files created by older supported ansible-test releases.

@ -2,20 +2,17 @@
from __future__ import annotations from __future__ import annotations
import errno import errno
import json
import os import os
import re import re
import typing as t import typing as t
from ...constants import (
COVERAGE_REQUIRED_VERSION,
)
from ...encoding import ( from ...encoding import (
to_bytes, to_bytes,
) )
from ...io import ( from ...io import (
open_binary_file, read_text_file,
read_json_file, read_json_file,
) )
@ -55,6 +52,12 @@ from ...provisioning import (
HostState, HostState,
) )
from ...coverage_util import (
get_coverage_file_schema_version,
CoverageError,
CONTROLLER_COVERAGE_VERSION,
)
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
import coverage as coverage_module import coverage as coverage_module
@ -79,11 +82,13 @@ def initialize_coverage(args, host_state): # type: (CoverageConfig, HostState)
except ImportError: except ImportError:
coverage = None coverage = None
coverage_required_version = CONTROLLER_COVERAGE_VERSION.coverage_version
if not coverage: if not coverage:
raise ApplicationError(f'Version {COVERAGE_REQUIRED_VERSION} of the Python "coverage" module must be installed to use this command.') raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module must be installed to use this command.')
if coverage.__version__ != COVERAGE_REQUIRED_VERSION: if coverage.__version__ != coverage_required_version:
raise ApplicationError(f'Version {COVERAGE_REQUIRED_VERSION} of the Python "coverage" module is required. Version {coverage.__version__} was found.') raise ApplicationError(f'Version {coverage_required_version} of the Python "coverage" module is required. Version {coverage.__version__} was found.')
return coverage return coverage
@ -158,24 +163,13 @@ def enumerate_python_arcs(
display.warning('Empty coverage file: %s' % path, verbosity=2) display.warning('Empty coverage file: %s' % path, verbosity=2)
return return
original = coverage.CoverageData()
try: try:
original.read_file(path) arc_data = read_python_coverage(path, coverage)
except Exception as ex: # pylint: disable=locally-disabled, broad-except except CoverageError as ex:
with open_binary_file(path) as file_obj: display.error(str(ex))
header = file_obj.read(6)
if header == b'SQLite':
display.error('File created by "coverage" 5.0+: %s' % os.path.relpath(path))
else:
display.error(u'%s' % ex)
return return
for filename in original.measured_files(): for filename, arcs in arc_data.items():
arcs = original.arcs(filename)
if not arcs: if not arcs:
# This is most likely due to using an unsupported version of coverage. # This is most likely due to using an unsupported version of coverage.
display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path)) display.warning('No arcs found for "%s" in coverage file: %s' % (filename, path))
@ -189,6 +183,51 @@ def enumerate_python_arcs(
yield filename, set(arcs) yield filename, set(arcs)
PythonArcs = t.Dict[str, t.List[t.Tuple[int, int]]]
"""Python coverage arcs."""
def read_python_coverage(path: str, coverage: coverage_module) -> PythonArcs:
"""Return coverage arcs from the specified coverage file. Raises a CoverageError exception if coverage cannot be read."""
try:
return read_python_coverage_native(path, coverage)
except CoverageError as ex:
schema_version = get_coverage_file_schema_version(path)
if schema_version == CONTROLLER_COVERAGE_VERSION.schema_version:
raise CoverageError(path, f'Unexpected failure reading supported schema version {schema_version}.') from ex
if schema_version == 0:
return read_python_coverage_legacy(path)
raise CoverageError(path, f'Unsupported schema version: {schema_version}')
def read_python_coverage_native(path: str, coverage: coverage_module) -> PythonArcs:
"""Return coverage arcs from the specified coverage file using the coverage API."""
try:
data = coverage.CoverageData(path)
data.read()
arcs = {filename: data.arcs(filename) for filename in data.measured_files()}
except Exception as ex:
raise CoverageError(path, f'Error reading coverage file using coverage API: {ex}') from ex
return arcs
def read_python_coverage_legacy(path: str) -> PythonArcs:
"""Return coverage arcs from the specified coverage file, which must be in the legacy JSON format."""
try:
contents = read_text_file(path)
contents = re.sub(r'''^!coverage.py: This is a private format, don't read it directly!''', '', contents)
data = json.loads(contents)
arcs: PythonArcs = {filename: [tuple(arc) for arc in arcs] for filename, arcs in data['arcs'].items()}
except Exception as ex:
raise CoverageError(path, f'Error reading JSON coverage file: {ex}') from ex
return arcs
def enumerate_powershell_lines( def enumerate_powershell_lines(
path, # type: str path, # type: str
collection_search_re, # type: t.Optional[t.Pattern] collection_search_re, # type: t.Optional[t.Pattern]

@ -161,8 +161,12 @@ def _command_coverage_combine_python(args, host_state): # type: (CoverageCombin
for group in sorted(groups): for group in sorted(groups):
arc_data = groups[group] arc_data = groups[group]
output_file = coverage_file + group + suffix
updated = coverage.CoverageData() if args.explain:
continue
updated = coverage.CoverageData(output_file)
for filename in arc_data: for filename in arc_data:
if not path_checker.check_path(filename): if not path_checker.check_path(filename):
@ -173,13 +177,11 @@ def _command_coverage_combine_python(args, host_state): # type: (CoverageCombin
if args.all: if args.all:
updated.add_arcs(dict((source[0], []) for source in sources)) updated.add_arcs(dict((source[0], []) for source in sources))
if not args.explain: updated.write() # always write files to make sure stale files do not exist
output_file = coverage_file + group + suffix
updated.write_file(output_file) # always write files to make sure stale files do not exist
if updated: if updated:
# only report files which are non-empty to prevent coverage from reporting errors # only report files which are non-empty to prevent coverage from reporting errors
output_files.append(output_file) output_files.append(output_file)
path_checker.report() path_checker.report()

@ -2,7 +2,9 @@
from __future__ import annotations from __future__ import annotations
import atexit import atexit
import dataclasses
import os import os
import sqlite3
import tempfile import tempfile
import typing as t import typing as t
@ -15,12 +17,16 @@ from .config import (
from .io import ( from .io import (
write_text_file, write_text_file,
make_dirs, make_dirs,
open_binary_file,
) )
from .util import ( from .util import (
ApplicationError,
InternalError,
COVERAGE_CONFIG_NAME, COVERAGE_CONFIG_NAME,
remove_tree, remove_tree,
sanitize_host_name, sanitize_host_name,
str_to_version,
) )
from .data import ( from .data import (
@ -41,6 +47,93 @@ from .host_configs import (
PythonConfig, PythonConfig,
) )
from .constants import (
SUPPORTED_PYTHON_VERSIONS,
CONTROLLER_PYTHON_VERSIONS,
)
@dataclasses.dataclass(frozen=True)
class CoverageVersion:
"""Details about a coverage version and its supported Python versions."""
coverage_version: str
schema_version: int
min_python: tuple[int, int]
max_python: tuple[int, int]
COVERAGE_VERSIONS = (
CoverageVersion('6.3.2', 7, (3, 7), (3, 11)),
CoverageVersion('4.5.4', 0, (2, 6), (3, 7)),
)
"""
This tuple specifies the coverage version to use for Python version ranges.
When versions overlap, the latest version of coverage (listed first) will be used.
"""
CONTROLLER_COVERAGE_VERSION = COVERAGE_VERSIONS[0]
"""The coverage version supported on the controller."""
class CoverageError(ApplicationError):
"""Exception caused while attempting to read a coverage file."""
def __init__(self, path: str, message: str) -> None:
self.path = path
self.message = message
super().__init__(f'Error reading coverage file "{os.path.relpath(path)}": {message}')
def get_coverage_version(version: str) -> CoverageVersion:
"""Return the coverage version to use with the specified Python version."""
python_version = str_to_version(version)
supported_versions = [entry for entry in COVERAGE_VERSIONS if entry.min_python <= python_version <= entry.max_python]
if not supported_versions:
raise InternalError(f'Python {version} has no matching entry in COVERAGE_VERSIONS.')
coverage_version = supported_versions[0]
return coverage_version
def get_coverage_file_schema_version(path: str) -> int:
"""
Return the schema version from the specified coverage file.
SQLite based files report schema version 1 or later.
JSON based files are reported as schema version 0.
An exception is raised if the file is not recognized or the schema version cannot be determined.
"""
with open_binary_file(path) as file_obj:
header = file_obj.read(16)
if header.startswith(b'!coverage.py: '):
return 0
if header.startswith(b'SQLite'):
return get_sqlite_schema_version(path)
raise CoverageError(path, f'Unknown header: {header!r}')
def get_sqlite_schema_version(path: str) -> int:
"""Return the schema version from a SQLite based coverage file."""
try:
with sqlite3.connect(path) as connection:
cursor = connection.cursor()
cursor.execute('select version from coverage_schema')
schema_version = cursor.fetchmany(1)[0][0]
except Exception as ex:
raise CoverageError(path, f'SQLite error: {ex}') from ex
if not isinstance(schema_version, int):
raise CoverageError(path, f'Schema version is {type(schema_version)} instead of {int}: {schema_version}')
if schema_version < 1:
raise CoverageError(path, f'Schema version is out-of-range: {schema_version}')
return schema_version
def cover_python( def cover_python(
args, # type: TestConfig args, # type: TestConfig
@ -196,3 +289,18 @@ include =
''' % data_context().content.root ''' % data_context().content.root
return coverage_config return coverage_config
def self_check() -> None:
"""Check for internal errors due to incorrect code changes."""
# Verify all supported Python versions have a coverage version.
for version in SUPPORTED_PYTHON_VERSIONS:
get_coverage_version(version)
# Verify all controller Python versions are mapped to the latest coverage version.
for version in CONTROLLER_PYTHON_VERSIONS:
if get_coverage_version(version) != CONTROLLER_COVERAGE_VERSION:
raise InternalError(f'Controller Python version {version} is not mapped to the latest coverage version.')
self_check()

@ -8,10 +8,6 @@ import os
import re import re
import typing as t import typing as t
from .constants import (
COVERAGE_REQUIRED_VERSION,
)
from .encoding import ( from .encoding import (
to_text, to_text,
to_bytes, to_bytes,
@ -59,6 +55,10 @@ from .connections import (
Connection, Connection,
) )
from .coverage_util import (
get_coverage_version,
)
QUIET_PIP_SCRIPT_PATH = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'quiet_pip.py') QUIET_PIP_SCRIPT_PATH = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'quiet_pip.py')
REQUIREMENTS_SCRIPT_PATH = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'requirements.py') REQUIREMENTS_SCRIPT_PATH = os.path.join(ANSIBLE_TEST_TARGET_ROOT, 'setup', 'requirements.py')
@ -214,7 +214,7 @@ def collect_requirements(
commands.extend(collect_package_install(packages=['virtualenv==16.7.12'], constraints=False)) commands.extend(collect_package_install(packages=['virtualenv==16.7.12'], constraints=False))
if coverage: if coverage:
commands.extend(collect_package_install(packages=[f'coverage=={COVERAGE_REQUIRED_VERSION}'], constraints=False)) commands.extend(collect_package_install(packages=[f'coverage=={get_coverage_version(python.version).coverage_version}'], constraints=False))
if cryptography: if cryptography:
commands.extend(collect_package_install(packages=get_cryptography_requirements(python))) commands.extend(collect_package_install(packages=get_cryptography_requirements(python)))

@ -596,6 +596,12 @@ class Display:
fd.flush() fd.flush()
class InternalError(Exception):
"""An unhandled internal error indicating a bug in the code."""
def __init__(self, message: str) -> None:
super().__init__(f'An internal error has occurred in ansible-test: {message}')
class ApplicationError(Exception): class ApplicationError(Exception):
"""General application error.""" """General application error."""

@ -84,6 +84,7 @@ bootstrap_remote_freebsd()
{ {
packages=" packages="
python${python_package_version} python${python_package_version}
py${python_package_version}-sqlite3
bash bash
curl curl
gtar gtar

Loading…
Cancel
Save