Code cleanup and refactoring in ansible-test. (#67063)

* Code cleanup in ansible-test.
* Split out encoding functions.
* Consoldate loading of JSON files.
* Split out disk IO functions.
* Simplify file access.
* Add functions for opening files.
* Replace open calls with appropriate functions.
* Expose more types from typing module.
* Support writing compact JSON.
* Add verbosity argument to display.warning.
* Add changelog entry.
* Update files overlooked during rebase.
* Use `io.open` instead of `open`.
* Fix file opening for imp.load_module.
* Remove use of `r+` mode to access files.
* Add missing import.
* Fix httptester on Python 2.x.
* Clarify changelog fragment.
* Consolidate imports. Remove extra newlines.
* Fix indirect imports.
pull/67089/head
Matt Clay 6 years ago committed by GitHub
parent 994a6b0c5a
commit f4a80bb600
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,5 @@
minor_changes:
- "ansible-test - Support writing compact JSON files instead of formatting and indenting the output."
- "ansible-test - Add a verbosity option for displaying warnings."
- "ansible-test - Refactor code to consolidate filesystem access and improve handling of encoding."
- "ansible-test - General code cleanup."

@ -733,6 +733,7 @@ class PathMapper:
if path.startswith('test/lib/ansible_test/config/'): if path.startswith('test/lib/ansible_test/config/'):
if name.startswith('cloud-config-'): if name.startswith('cloud-config-'):
# noinspection PyTypeChecker
cloud_target = 'cloud/%s/' % name.split('-')[2].split('.')[0] cloud_target = 'cloud/%s/' % name.split('-')[2].split('.')[0]
if cloud_target in self.integration_targets_by_alias: if cloud_target in self.integration_targets_by_alias:

@ -822,6 +822,7 @@ def complete_target(prefix, parsed_args, **_):
return find_target_completion(parsed_args.targets, prefix) return find_target_completion(parsed_args.targets, prefix)
# noinspection PyUnusedLocal
def complete_remote(prefix, parsed_args, **_): def complete_remote(prefix, parsed_args, **_):
""" """
:type prefix: unicode :type prefix: unicode
@ -835,6 +836,7 @@ def complete_remote(prefix, parsed_args, **_):
return [i for i in images if i.startswith(prefix)] return [i for i in images if i.startswith(prefix)]
# noinspection PyUnusedLocal
def complete_remote_shell(prefix, parsed_args, **_): def complete_remote_shell(prefix, parsed_args, **_):
""" """
:type prefix: unicode :type prefix: unicode
@ -852,6 +854,7 @@ def complete_remote_shell(prefix, parsed_args, **_):
return [i for i in images if i.startswith(prefix)] return [i for i in images if i.startswith(prefix)]
# noinspection PyUnusedLocal
def complete_docker(prefix, parsed_args, **_): def complete_docker(prefix, parsed_args, **_):
""" """
:type prefix: unicode :type prefix: unicode
@ -911,6 +914,7 @@ def complete_network_testcase(prefix, parsed_args, **_):
return testcases return testcases
# noinspection PyUnusedLocal
def complete_sanity_test(prefix, parsed_args, **_): def complete_sanity_test(prefix, parsed_args, **_):
""" """
:type prefix: unicode :type prefix: unicode

@ -14,6 +14,14 @@ import tempfile
from .. import types as t from .. import types as t
from ..encoding import (
to_bytes,
)
from ..io import (
read_text_file,
)
from ..util import ( from ..util import (
ApplicationError, ApplicationError,
display, display,
@ -21,7 +29,6 @@ from ..util import (
import_plugins, import_plugins,
load_plugins, load_plugins,
ABC, ABC,
to_bytes,
ANSIBLE_TEST_CONFIG_ROOT, ANSIBLE_TEST_CONFIG_ROOT,
) )
@ -365,11 +372,10 @@ class CloudProvider(CloudBase):
""" """
:rtype: str :rtype: str
""" """
with open(self.config_template_path, 'r') as template_fd: lines = read_text_file(self.config_template_path).splitlines()
lines = template_fd.read().splitlines() lines = [line for line in lines if not line.startswith('#')]
lines = [l for l in lines if not l.startswith('#')] config = '\n'.join(lines).strip() + '\n'
config = '\n'.join(lines).strip() + '\n' return config
return config
@staticmethod @staticmethod
def _populate_config_template(template, values): def _populate_config_template(template, values):

@ -4,6 +4,10 @@ __metaclass__ = type
import os import os
from ..io import (
read_text_file,
)
from ..util import ( from ..util import (
ApplicationError, ApplicationError,
display, display,
@ -86,8 +90,7 @@ class AzureCloudProvider(CloudProvider):
response = {} response = {}
if os.path.isfile(self.SHERLOCK_CONFIG_PATH): if os.path.isfile(self.SHERLOCK_CONFIG_PATH):
with open(self.SHERLOCK_CONFIG_PATH, 'r') as sherlock_fd: sherlock_uri = read_text_file(self.SHERLOCK_CONFIG_PATH).splitlines()[0].strip() + '&rgcount=2'
sherlock_uri = sherlock_fd.readline().strip() + '&rgcount=2'
parts = urlparse(sherlock_uri) parts = urlparse(sherlock_uri)
query_string = parse_qs(parts.query) query_string = parse_qs(parts.query)

@ -7,7 +7,7 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
from os.path import isfile import os
from . import ( from . import (
CloudProvider, CloudProvider,
@ -34,7 +34,7 @@ class CloudscaleCloudProvider(CloudProvider):
:type targets: tuple[TestTarget] :type targets: tuple[TestTarget]
:type exclude: list[str] :type exclude: list[str]
""" """
if isfile(self.config_static_path): if os.path.isfile(self.config_static_path):
return return
super(CloudscaleCloudProvider, self).filter(targets, exclude) super(CloudscaleCloudProvider, self).filter(targets, exclude)
@ -43,7 +43,7 @@ class CloudscaleCloudProvider(CloudProvider):
"""Setup the cloud resource before delegation and register a cleanup callback.""" """Setup the cloud resource before delegation and register a cleanup callback."""
super(CloudscaleCloudProvider, self).setup() super(CloudscaleCloudProvider, self).setup()
if isfile(self.config_static_path): if os.path.isfile(self.config_static_path):
display.info('Using existing %s cloud config: %s' display.info('Using existing %s cloud config: %s'
% (self.platform, self.config_static_path), % (self.platform, self.config_static_path),
verbosity=1) verbosity=1)

@ -49,7 +49,7 @@ class ForemanProvider(CloudProvider):
""" """
super(ForemanProvider, self).__init__(args) super(ForemanProvider, self).__init__(args)
self.__container_from_env = os.getenv('ANSIBLE_FRMNSIM_CONTAINER') self.__container_from_env = os.environ.get('ANSIBLE_FRMNSIM_CONTAINER')
"""Overrides target container, might be used for development. """Overrides target container, might be used for development.
Use ANSIBLE_FRMNSIM_CONTAINER=whatever_you_want if you want Use ANSIBLE_FRMNSIM_CONTAINER=whatever_you_want if you want

@ -49,7 +49,7 @@ class NiosProvider(CloudProvider):
""" """
super(NiosProvider, self).__init__(args) super(NiosProvider, self).__init__(args)
self.__container_from_env = os.getenv('ANSIBLE_NIOSSIM_CONTAINER') self.__container_from_env = os.environ.get('ANSIBLE_NIOSSIM_CONTAINER')
"""Overrides target container, might be used for development. """Overrides target container, might be used for development.
Use ANSIBLE_NIOSSIM_CONTAINER=whatever_you_want if you want Use ANSIBLE_NIOSSIM_CONTAINER=whatever_you_want if you want

@ -13,6 +13,10 @@ from . import (
CloudEnvironmentConfig, CloudEnvironmentConfig,
) )
from ..io import (
read_text_file,
)
from ..util import ( from ..util import (
find_executable, find_executable,
ApplicationError, ApplicationError,
@ -106,8 +110,7 @@ class OpenShiftCloudProvider(CloudProvider):
def _setup_static(self): def _setup_static(self):
"""Configure OpenShift tests for use with static configuration.""" """Configure OpenShift tests for use with static configuration."""
with open(self.config_static_path, 'r') as config_fd: config = read_text_file(self.config_static_path)
config = config_fd.read()
match = re.search(r'^ *server: (?P<server>.*)$', config, flags=re.MULTILINE) match = re.search(r'^ *server: (?P<server>.*)$', config, flags=re.MULTILINE)

@ -18,18 +18,22 @@ from .http import (
HttpError, HttpError,
) )
from .io import (
make_dirs,
read_text_file,
write_json_file,
write_text_file,
)
from .util import ( from .util import (
ApplicationError, ApplicationError,
make_dirs,
display, display,
is_shippable, is_shippable,
to_text,
ANSIBLE_TEST_DATA_ROOT, ANSIBLE_TEST_DATA_ROOT,
) )
from .util_common import ( from .util_common import (
run_command, run_command,
write_json_file,
ResultType, ResultType,
) )
@ -233,8 +237,7 @@ class AnsibleCoreCI:
def start_remote(self): def start_remote(self):
"""Start instance for remote development/testing.""" """Start instance for remote development/testing."""
with open(self.ci_key, 'r') as key_fd: auth_key = read_text_file(self.ci_key).strip()
auth_key = key_fd.read().strip()
return self._start(dict( return self._start(dict(
remote=dict( remote=dict(
@ -367,8 +370,7 @@ class AnsibleCoreCI:
display.info('Initializing new %s/%s instance %s.' % (self.platform, self.version, self.instance_id), verbosity=1) display.info('Initializing new %s/%s instance %s.' % (self.platform, self.version, self.instance_id), verbosity=1)
if self.platform == 'windows': if self.platform == 'windows':
with open(os.path.join(ANSIBLE_TEST_DATA_ROOT, 'setup', 'ConfigureRemotingForAnsible.ps1'), 'rb') as winrm_config_fd: winrm_config = read_text_file(os.path.join(ANSIBLE_TEST_DATA_ROOT, 'setup', 'ConfigureRemotingForAnsible.ps1'))
winrm_config = to_text(winrm_config_fd.read())
else: else:
winrm_config = None winrm_config = None
@ -470,8 +472,7 @@ class AnsibleCoreCI:
def _load(self): def _load(self):
"""Load instance information.""" """Load instance information."""
try: try:
with open(self.path, 'r') as instance_fd: data = read_text_file(self.path)
data = instance_fd.read()
except IOError as ex: except IOError as ex:
if ex.errno != errno.ENOENT: if ex.errno != errno.ENOENT:
raise raise
@ -597,8 +598,7 @@ class SshKey:
if args.explain: if args.explain:
self.pub_contents = None self.pub_contents = None
else: else:
with open(self.pub, 'r') as pub_fd: self.pub_contents = read_text_file(self.pub).strip()
self.pub_contents = pub_fd.read().strip()
def get_in_tree_key_pair_paths(self): # type: () -> t.Optional[t.Tuple[str, str]] def get_in_tree_key_pair_paths(self): # type: () -> t.Optional[t.Tuple[str, str]]
"""Return the ansible-test SSH key pair paths from the content tree.""" """Return the ansible-test SSH key pair paths from the content tree."""
@ -643,11 +643,10 @@ class SshKey:
run_command(args, ['ssh-keygen', '-m', 'PEM', '-q', '-t', 'rsa', '-N', '', '-f', key]) run_command(args, ['ssh-keygen', '-m', 'PEM', '-q', '-t', 'rsa', '-N', '', '-f', key])
# newer ssh-keygen PEM output (such as on RHEL 8.1) is not recognized by paramiko # newer ssh-keygen PEM output (such as on RHEL 8.1) is not recognized by paramiko
with open(key, 'r+') as key_fd: key_contents = read_text_file(key)
key_contents = key_fd.read() key_contents = re.sub(r'(BEGIN|END) PRIVATE KEY', r'\1 RSA PRIVATE KEY', key_contents)
key_contents = re.sub(r'(BEGIN|END) PRIVATE KEY', r'\1 RSA PRIVATE KEY', key_contents)
key_fd.seek(0) write_text_file(key, key_contents)
key_fd.write(key_contents)
return key, pub return key, pub

@ -2,7 +2,6 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import json
import os import os
import re import re
@ -12,9 +11,13 @@ from ..target import (
walk_powershell_targets, walk_powershell_targets,
) )
from ..io import (
read_json_file,
read_text_file,
)
from ..util import ( from ..util import (
display, display,
to_text,
) )
from ..util_common import ( from ..util_common import (
@ -191,8 +194,7 @@ def _command_coverage_combine_powershell(args):
continue continue
try: try:
with open(coverage_file, 'rb') as original_fd: coverage_run = read_json_file(coverage_file)
coverage_run = json.loads(to_text(original_fd.read(), errors='replace'))
except Exception as ex: # pylint: disable=locally-disabled, broad-except except Exception as ex: # pylint: disable=locally-disabled, broad-except
display.error(u'%s' % ex) display.error(u'%s' % ex)
continue continue
@ -275,8 +277,7 @@ def _get_coverage_targets(args, walk_func):
for target in walk_func(include_symlinks=False): for target in walk_func(include_symlinks=False):
target_path = os.path.abspath(target.path) target_path = os.path.abspath(target.path)
with open(target_path, 'r') as target_fd: target_lines = len(read_text_file(target_path).splitlines())
target_lines = len(target_fd.read().splitlines())
sources.append((target_path, target_lines)) sources.append((target_path, target_lines))
@ -327,6 +328,7 @@ def get_coverage_group(args, coverage_file):
""" """
parts = os.path.basename(coverage_file).split('=', 4) parts = os.path.basename(coverage_file).split('=', 4)
# noinspection PyTypeChecker
if len(parts) != 5 or not parts[4].startswith('coverage.'): if len(parts) != 5 or not parts[4].startswith('coverage.'):
return None return None

@ -4,9 +4,12 @@ __metaclass__ = type
import os import os
from ..io import (
make_dirs,
)
from ..util import ( from ..util import (
display, display,
make_dirs,
) )
from ..util_common import ( from ..util_common import (

@ -2,12 +2,14 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import json
import os import os
from ..io import (
read_json_file,
)
from ..util import ( from ..util import (
display, display,
to_text,
) )
from ..data import ( from ..data import (
@ -57,8 +59,7 @@ def _generate_powershell_output_report(args, coverage_file):
:type coverage_file: str :type coverage_file: str
:rtype: str :rtype: str
""" """
with open(coverage_file, 'rb') as coverage_fd: coverage_info = read_json_file(coverage_file)
coverage_info = json.loads(to_text(coverage_fd.read()))
root_path = data_context().content.root + '/' root_path = data_context().content.root + '/'

@ -2,7 +2,6 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import json
import os import os
import time import time
@ -17,9 +16,9 @@ from xml.dom import (
minidom, minidom,
) )
from ..util import ( from ..io import (
to_text,
make_dirs, make_dirs,
read_json_file,
) )
from ..util_common import ( from ..util_common import (
@ -72,8 +71,7 @@ def _generate_powershell_xml(coverage_file):
:type coverage_file: str :type coverage_file: str
:rtype: Element :rtype: Element
""" """
with open(coverage_file, 'rb') as coverage_fd: coverage_info = read_json_file(coverage_file)
coverage_info = json.loads(to_text(coverage_fd.read()))
content_root = data_context().content.root content_root = data_context().content.root
is_ansible = data_context().content.is_ansible is_ansible = data_context().content.is_ansible

@ -12,15 +12,15 @@ from .config import (
TestConfig, TestConfig,
) )
from .io import (
write_text_file,
)
from .util import ( from .util import (
COVERAGE_CONFIG_NAME, COVERAGE_CONFIG_NAME,
remove_tree, remove_tree,
) )
from .util_common import (
write_text_file,
)
from .data import ( from .data import (
data_context, data_context,
) )

@ -5,6 +5,10 @@ __metaclass__ = type
import os import os
import re import re
from .io import (
open_text_file,
)
from .util import ( from .util import (
display, display,
) )
@ -53,7 +57,7 @@ def get_csharp_module_utils_name(path): # type: (str) -> str
else: else:
prefix = '' prefix = ''
name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.sep, '.') name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
return name return name
@ -80,7 +84,7 @@ def extract_csharp_module_utils_imports(path, module_utils, is_pure_csharp):
else: else:
pattern = re.compile(r'(?i)^#\s*ansiblerequires\s+-csharputil\s+((?:Ansible|ansible.collections)\..+)') pattern = re.compile(r'(?i)^#\s*ansiblerequires\s+-csharputil\s+((?:Ansible|ansible.collections)\..+)')
with open(path, 'r') as module_file: with open_text_file(path) as module_file:
for line_number, line in enumerate(module_file, 1): for line_number, line in enumerate(module_file, 1):
match = re.search(pattern, line) match = re.search(pattern, line)

@ -44,14 +44,12 @@ from .manage_ci import (
from .util import ( from .util import (
ApplicationError, ApplicationError,
common_environment, common_environment,
pass_vars,
display, display,
ANSIBLE_BIN_PATH, ANSIBLE_BIN_PATH,
ANSIBLE_TEST_DATA_ROOT, ANSIBLE_TEST_DATA_ROOT,
ANSIBLE_LIB_ROOT, ANSIBLE_LIB_ROOT,
ANSIBLE_TEST_ROOT, ANSIBLE_TEST_ROOT,
tempdir, tempdir,
make_dirs,
) )
from .util_common import ( from .util_common import (
@ -203,7 +201,7 @@ def delegate_venv(args, # type: EnvironmentConfig
os.symlink(ANSIBLE_TEST_ROOT, os.path.join(library_path, 'ansible_test')) os.symlink(ANSIBLE_TEST_ROOT, os.path.join(library_path, 'ansible_test'))
env.update( env.update(
PATH=inject_path + os.pathsep + env['PATH'], PATH=inject_path + os.path.pathsep + env['PATH'],
PYTHONPATH=library_path, PYTHONPATH=library_path,
) )

@ -6,6 +6,11 @@ import json
import os import os
import time import time
from .io import (
open_binary_file,
read_text_file,
)
from .util import ( from .util import (
ApplicationError, ApplicationError,
common_environment, common_environment,
@ -41,8 +46,7 @@ def get_docker_container_id():
if not os.path.exists(path): if not os.path.exists(path):
return None return None
with open(path) as cgroup_fd: contents = read_text_file(path)
contents = cgroup_fd.read()
paths = [line.split(':')[2] for line in contents.splitlines()] paths = [line.split(':')[2] for line in contents.splitlines()]
container_ids = set(path.split('/')[2] for path in paths if path.startswith('/docker/')) container_ids = set(path.split('/')[2] for path in paths if path.startswith('/docker/'))
@ -110,7 +114,7 @@ def docker_put(args, container_id, src, dst):
:type dst: str :type dst: str
""" """
# avoid 'docker cp' due to a bug which causes 'docker rm' to fail # avoid 'docker cp' due to a bug which causes 'docker rm' to fail
with open(src, 'rb') as src_fd: with open_binary_file(src) as src_fd:
docker_exec(args, container_id, ['dd', 'of=%s' % dst, 'bs=%s' % BUFFER_SIZE], docker_exec(args, container_id, ['dd', 'of=%s' % dst, 'bs=%s' % BUFFER_SIZE],
options=['-i'], stdin=src_fd, capture=True) options=['-i'], stdin=src_fd, capture=True)
@ -123,7 +127,7 @@ def docker_get(args, container_id, src, dst):
:type dst: str :type dst: str
""" """
# avoid 'docker cp' due to a bug which causes 'docker rm' to fail # avoid 'docker cp' due to a bug which causes 'docker rm' to fail
with open(dst, 'wb') as dst_fd: with open_binary_file(dst, 'wb') as dst_fd:
docker_exec(args, container_id, ['dd', 'if=%s' % src, 'bs=%s' % BUFFER_SIZE], docker_exec(args, container_id, ['dd', 'if=%s' % src, 'bs=%s' % BUFFER_SIZE],
options=['-i'], stdout=dst_fd, capture=True) options=['-i'], stdout=dst_fd, capture=True)

@ -0,0 +1,41 @@
"""Functions for encoding and decoding strings."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from . import types as t
ENCODING = 'utf-8'
Text = type(u'')
def to_optional_bytes(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[bytes]
"""Return the given value as bytes encoded using UTF-8 if not already bytes, or None if the value is None."""
return None if value is None else to_bytes(value, errors)
def to_optional_text(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[t.Text]
"""Return the given value as text decoded using UTF-8 if not already text, or None if the value is None."""
return None if value is None else to_text(value, errors)
def to_bytes(value, errors='strict'): # type: (t.AnyStr, str) -> bytes
"""Return the given value as bytes encoded using UTF-8 if not already bytes."""
if isinstance(value, bytes):
return value
if isinstance(value, Text):
return value.encode(ENCODING, errors)
raise Exception('value is not bytes or text: %s' % type(value))
def to_text(value, errors='strict'): # type: (t.AnyStr, str) -> t.Text
"""Return the given value as text decoded using UTF-8 if not already text."""
if isinstance(value, bytes):
return value.decode(ENCODING, errors)
if isinstance(value, Text):
return value
raise Exception('value is not bytes or text: %s' % type(value))

@ -3,7 +3,6 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import datetime import datetime
import json
import functools import functools
import os import os
import platform import platform
@ -17,6 +16,11 @@ from .config import (
TestConfig, TestConfig,
) )
from .io import (
write_json_file,
read_json_file,
)
from .util import ( from .util import (
display, display,
find_executable, find_executable,
@ -28,7 +32,6 @@ from .util import (
from .util_common import ( from .util_common import (
write_json_test_results, write_json_test_results,
write_json_file,
ResultType, ResultType,
) )
@ -164,9 +167,7 @@ def get_timeout():
if not os.path.exists(TIMEOUT_PATH): if not os.path.exists(TIMEOUT_PATH):
return None return None
with open(TIMEOUT_PATH, 'r') as timeout_fd: data = read_json_file(TIMEOUT_PATH)
data = json.load(timeout_fd)
data['deadline'] = datetime.datetime.strptime(data['deadline'], '%Y-%m-%dT%H:%M:%SZ') data['deadline'] = datetime.datetime.strptime(data['deadline'], '%Y-%m-%dT%H:%M:%SZ')
return data return data

@ -40,13 +40,20 @@ from .cloud import (
CloudEnvironmentConfig, CloudEnvironmentConfig,
) )
from .io import (
make_dirs,
open_text_file,
read_binary_file,
read_text_file,
write_text_file,
)
from .util import ( from .util import (
ApplicationWarning, ApplicationWarning,
ApplicationError, ApplicationError,
SubprocessError, SubprocessError,
display, display,
remove_tree, remove_tree,
make_dirs,
is_shippable, is_shippable,
is_binary_file, is_binary_file,
find_executable, find_executable,
@ -71,7 +78,6 @@ from .util_common import (
intercept_command, intercept_command,
named_temporary_file, named_temporary_file,
run_command, run_command,
write_text_file,
write_json_test_results, write_json_test_results,
ResultType, ResultType,
handle_layout_messages, handle_layout_messages,
@ -1200,12 +1206,12 @@ def inject_httptester(args):
""" """
comment = ' # ansible-test httptester\n' comment = ' # ansible-test httptester\n'
append_lines = ['127.0.0.1 %s%s' % (host, comment) for host in HTTPTESTER_HOSTS] append_lines = ['127.0.0.1 %s%s' % (host, comment) for host in HTTPTESTER_HOSTS]
hosts_path = '/etc/hosts'
with open('/etc/hosts', 'r+') as hosts_fd: original_lines = read_text_file(hosts_path).splitlines(True)
original_lines = hosts_fd.readlines()
if not any(line.endswith(comment) for line in original_lines): if not any(line.endswith(comment) for line in original_lines):
hosts_fd.writelines(append_lines) write_text_file(hosts_path, ''.join(original_lines + append_lines))
# determine which forwarding mechanism to use # determine which forwarding mechanism to use
pfctl = find_executable('pfctl', required=False) pfctl = find_executable('pfctl', required=False)
@ -1510,8 +1516,7 @@ def detect_changes(args):
elif args.changed_from or args.changed_path: elif args.changed_from or args.changed_path:
paths = args.changed_path or [] paths = args.changed_path or []
if args.changed_from: if args.changed_from:
with open(args.changed_from, 'r') as changes_fd: paths += read_text_file(args.changed_from).splitlines()
paths += changes_fd.read().splitlines()
elif args.changed: elif args.changed:
paths = detect_changes_local(args) paths = detect_changes_local(args)
else: else:
@ -1599,8 +1604,7 @@ def detect_changes_local(args):
args.metadata.changes[path] = ((0, 0),) args.metadata.changes[path] = ((0, 0),)
continue continue
with open(path, 'r') as source_fd: line_count = len(read_text_file(path).splitlines())
line_count = len(source_fd.read().splitlines())
args.metadata.changes[path] = ((1, line_count),) args.metadata.changes[path] = ((1, line_count),)
@ -2056,7 +2060,7 @@ class EnvironmentDescription:
:type path: str :type path: str
:rtype: str :rtype: str
""" """
with open(path) as script_fd: with open_text_file(path) as script_fd:
return script_fd.readline().strip() return script_fd.readline().strip()
@staticmethod @staticmethod
@ -2070,8 +2074,7 @@ class EnvironmentDescription:
file_hash = hashlib.md5() file_hash = hashlib.md5()
with open(path, 'rb') as file_fd: file_hash.update(read_binary_file(path))
file_hash.update(file_fd.read())
return file_hash.hexdigest() return file_hash.hexdigest()

@ -7,6 +7,10 @@ import os
from . import types as t from . import types as t
from .io import (
read_text_file,
)
from .util import ( from .util import (
display, display,
ApplicationError, ApplicationError,
@ -130,7 +134,7 @@ def get_python_module_utils_name(path): # type: (str) -> str
if path.endswith('/__init__.py'): if path.endswith('/__init__.py'):
path = os.path.dirname(path) path = os.path.dirname(path)
name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.sep, '.') name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
return name return name
@ -161,20 +165,19 @@ def extract_python_module_utils_imports(path, module_utils):
:type module_utils: set[str] :type module_utils: set[str]
:rtype: set[str] :rtype: set[str]
""" """
with open(path, 'r') as module_fd: code = read_text_file(path)
code = module_fd.read()
try:
try: tree = ast.parse(code)
tree = ast.parse(code) except SyntaxError as ex:
except SyntaxError as ex: # Treat this error as a warning so tests can be executed as best as possible.
# Treat this error as a warning so tests can be executed as best as possible. # The compile test will detect and report this syntax error.
# The compile test will detect and report this syntax error. display.warning('%s:%s Syntax error extracting module_utils imports: %s' % (path, ex.lineno, ex.msg))
display.warning('%s:%s Syntax error extracting module_utils imports: %s' % (path, ex.lineno, ex.msg)) return set()
return set()
finder = ModuleUtilFinder(path, module_utils)
finder = ModuleUtilFinder(path, module_utils) finder.visit(tree)
finder.visit(tree) return finder.imports
return finder.imports
class ModuleUtilFinder(ast.NodeVisitor): class ModuleUtilFinder(ast.NodeVisitor):

@ -10,6 +10,10 @@ import tempfile
from .. import types as t from .. import types as t
from ..encoding import (
to_bytes,
)
from ..target import ( from ..target import (
analyze_integration_target_dependencies, analyze_integration_target_dependencies,
walk_integration_targets, walk_integration_targets,
@ -22,20 +26,23 @@ from ..config import (
WindowsIntegrationConfig, WindowsIntegrationConfig,
) )
from ..io import (
make_dirs,
write_text_file,
read_text_file,
)
from ..util import ( from ..util import (
ApplicationError, ApplicationError,
display, display,
make_dirs,
COVERAGE_CONFIG_NAME, COVERAGE_CONFIG_NAME,
MODE_DIRECTORY, MODE_DIRECTORY,
MODE_DIRECTORY_WRITE, MODE_DIRECTORY_WRITE,
MODE_FILE, MODE_FILE,
to_bytes,
) )
from ..util_common import ( from ..util_common import (
named_temporary_file, named_temporary_file,
write_text_file,
ResultType, ResultType,
) )
@ -136,8 +143,7 @@ def check_inventory(args, inventory_path): # type: (IntegrationConfig, str) ->
"""Check the given inventory for issues.""" """Check the given inventory for issues."""
if args.docker or args.remote: if args.docker or args.remote:
if os.path.exists(inventory_path): if os.path.exists(inventory_path):
with open(inventory_path) as inventory_file: inventory = read_text_file(inventory_path)
inventory = inventory_file.read()
if 'ansible_ssh_private_key_file' in inventory: if 'ansible_ssh_private_key_file' in inventory:
display.warning('Use of "ansible_ssh_private_key_file" in inventory with the --docker or --remote option is unsupported and will likely fail.') display.warning('Use of "ansible_ssh_private_key_file" in inventory with the --docker or --remote option is unsupported and will likely fail.')

@ -0,0 +1,74 @@
"""Functions for disk IO."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import errno
import io
import json
import os
from . import types as t
from .encoding import (
ENCODING,
to_bytes,
to_text,
)
def read_json_file(path): # type: (t.AnyStr) -> t.Any
"""Parse and return the json content from the specified path."""
return json.loads(read_text_file(path))
def read_text_file(path): # type: (t.AnyStr) -> t.Text
"""Return the contents of the specified path as text."""
return to_text(read_binary_file(path))
def read_binary_file(path): # type: (t.AnyStr) -> bytes
"""Return the contents of the specified path as bytes."""
with open_binary_file(path) as file:
return file.read()
def make_dirs(path): # type: (str) -> None
"""Create a directory at path, including any necessary parent directories."""
try:
os.makedirs(to_bytes(path))
except OSError as ex:
if ex.errno != errno.EEXIST:
raise
def write_json_file(path, content, create_directories=False, formatted=True): # type: (str, t.Union[t.List[t.Any], t.Dict[str, t.Any]], bool, bool) -> None
"""Write the given json content to the specified path, optionally creating missing directories."""
text_content = json.dumps(content, sort_keys=formatted, indent=4 if formatted else None, separators=(', ', ': ') if formatted else (',', ':')) + '\n'
write_text_file(path, text_content, create_directories=create_directories)
def write_text_file(path, content, create_directories=False): # type: (str, str, bool) -> None
"""Write the given text content to the specified path, optionally creating missing directories."""
if create_directories:
make_dirs(os.path.dirname(path))
with open_binary_file(path, 'wb') as file:
file.write(to_bytes(content))
def open_text_file(path, mode='r'): # type: (str, str) -> t.TextIO
"""Open the given path for text access."""
if 'b' in mode:
raise Exception('mode cannot include "b" for text files: %s' % mode)
# noinspection PyTypeChecker
return io.open(to_bytes(path), mode, encoding=ENCODING)
def open_binary_file(path, mode='rb'): # type: (str, str) -> t.BinaryIO
"""Open the given path for binary access."""
if 'b' not in mode:
raise Exception('mode must include "b" for binary files: %s' % mode)
# noinspection PyTypeChecker
return io.open(to_bytes(path), mode)

@ -2,8 +2,6 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import json
from . import types as t from . import types as t
from .util import ( from .util import (
@ -11,8 +9,9 @@ from .util import (
is_shippable, is_shippable,
) )
from .util_common import ( from .io import (
write_json_file, write_json_file,
read_json_file,
) )
from .diff import ( from .diff import (
@ -84,9 +83,7 @@ class Metadata:
:type path: str :type path: str
:rtype: Metadata :rtype: Metadata
""" """
with open(path, 'r') as data_fd: data = read_json_file(path)
data = json.load(data_fd)
return Metadata.from_dict(data) return Metadata.from_dict(data)
@staticmethod @staticmethod

@ -5,6 +5,10 @@ __metaclass__ = type
import os import os
import re import re
from .io import (
read_text_file,
)
from .util import ( from .util import (
display, display,
) )
@ -49,7 +53,7 @@ def get_powershell_module_utils_name(path): # type: (str) -> str
else: else:
prefix = '' prefix = ''
name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.sep, '.') name = prefix + os.path.splitext(os.path.relpath(path, base_path))[0].replace(os.path.sep, '.')
return name return name
@ -71,27 +75,26 @@ def extract_powershell_module_utils_imports(path, module_utils):
""" """
imports = set() imports = set()
with open(path, 'r') as module_fd: code = read_text_file(path)
code = module_fd.read()
if '# POWERSHELL_COMMON' in code: if '# POWERSHELL_COMMON' in code:
imports.add('Ansible.ModuleUtils.Legacy') imports.add('Ansible.ModuleUtils.Legacy')
lines = code.splitlines() lines = code.splitlines()
line_number = 0 line_number = 0
for line in lines: for line in lines:
line_number += 1 line_number += 1
match = re.search(r'(?i)^#\s*(?:requires\s+-module(?:s?)|ansiblerequires\s+-powershell)\s*((?:Ansible|ansible_collections)\..+)', line) match = re.search(r'(?i)^#\s*(?:requires\s+-module(?:s?)|ansiblerequires\s+-powershell)\s*((?:Ansible|ansible_collections)\..+)', line)
if not match: if not match:
continue continue
import_name = match.group(1) import_name = match.group(1)
if import_name in module_utils: if import_name in module_utils:
imports.add(import_name) imports.add(import_name)
else: else:
display.warning('%s:%d Invalid module_utils import: %s' % (path, line_number, import_name)) display.warning('%s:%d Invalid module_utils import: %s' % (path, line_number, import_name))
return imports return imports

@ -44,7 +44,7 @@ class Layout:
else: else:
tree = self.__files_tree tree = self.__files_tree
parts = directory.rstrip(os.sep).split(os.sep) parts = directory.rstrip(os.path.sep).split(os.path.sep)
item = get_tree_item(tree, parts) item = get_tree_item(tree, parts)
if not item: if not item:
@ -63,13 +63,13 @@ class Layout:
def get_dirs(self, directory): # type: (str) -> t.List[str] def get_dirs(self, directory): # type: (str) -> t.List[str]
"""Return a list directory paths found directly under the given directory.""" """Return a list directory paths found directly under the given directory."""
parts = directory.rstrip(os.sep).split(os.sep) parts = directory.rstrip(os.path.sep).split(os.path.sep)
item = get_tree_item(self.__files_tree, parts) item = get_tree_item(self.__files_tree, parts)
return [os.path.join(directory, key) for key in item[0].keys()] if item else [] return [os.path.join(directory, key) for key in item[0].keys()] if item else []
def get_files(self, directory): # type: (str) -> t.List[str] def get_files(self, directory): # type: (str) -> t.List[str]
"""Return a list of file paths found directly under the given directory.""" """Return a list of file paths found directly under the given directory."""
parts = directory.rstrip(os.sep).split(os.sep) parts = directory.rstrip(os.path.sep).split(os.path.sep)
item = get_tree_item(self.__files_tree, parts) item = get_tree_item(self.__files_tree, parts)
return item[1] if item else [] return item[1] if item else []
@ -205,7 +205,7 @@ def paths_to_tree(paths): # type: (t.List[str]) -> t.Tuple(t.Dict[str, t.Any],
tree = {}, [] tree = {}, []
for path in paths: for path in paths:
parts = path.split(os.sep) parts = path.split(os.path.sep)
root = tree root = tree
for part in parts[:-1]: for part in parts[:-1]:

@ -33,7 +33,7 @@ class CollectionLayout(LayoutProvider):
collection_root = os.path.dirname(os.path.dirname(root)) collection_root = os.path.dirname(os.path.dirname(root))
collection_dir = os.path.relpath(root, collection_root) collection_dir = os.path.relpath(root, collection_root)
collection_namespace, collection_name = collection_dir.split(os.sep) collection_namespace, collection_name = collection_dir.split(os.path.sep)
collection_root = os.path.dirname(collection_root) collection_root = os.path.dirname(collection_root)

@ -10,7 +10,7 @@ from ...git import (
Git, Git,
) )
from ...util import ( from ...encoding import (
to_bytes, to_bytes,
) )

@ -10,7 +10,7 @@ from ...constants import (
TIMEOUT_PATH, TIMEOUT_PATH,
) )
from ...util import ( from ...encoding import (
to_bytes, to_bytes,
) )

@ -4,13 +4,16 @@ __metaclass__ = type
import abc import abc
import glob import glob
import json
import os import os
import re import re
import collections import collections
from .. import types as t from .. import types as t
from ..io import (
read_json_file,
)
from ..util import ( from ..util import (
ApplicationError, ApplicationError,
SubprocessError, SubprocessError,
@ -669,8 +672,7 @@ class SanityCodeSmellTest(SanityTest):
self.config = None self.config = None
if self.config_path: if self.config_path:
with open(self.config_path, 'r') as config_fd: self.config = read_json_file(self.config_path)
self.config = json.load(config_fd)
if self.config: if self.config:
self.enabled = not self.config.get('disabled') self.enabled = not self.config.get('disabled')

@ -12,7 +12,6 @@ from ..sanity import (
SanitySingleVersion, SanitySingleVersion,
SanityFailure, SanityFailure,
SanitySuccess, SanitySuccess,
SanityMessage,
) )
from ..target import ( from ..target import (

@ -32,6 +32,10 @@ from ..cloud import (
get_cloud_platforms, get_cloud_platforms,
) )
from ..io import (
read_text_file,
)
from ..util import ( from ..util import (
display, display,
) )
@ -108,8 +112,7 @@ class IntegrationAliasesTest(SanityVersionNeutral):
:rtype: list[str] :rtype: list[str]
""" """
if not self._shippable_yml_lines: if not self._shippable_yml_lines:
with open(self.SHIPPABLE_YML, 'r') as shippable_yml_fd: self._shippable_yml_lines = read_text_file(self.SHIPPABLE_YML).splitlines()
self._shippable_yml_lines = shippable_yml_fd.read().splitlines()
return self._shippable_yml_lines return self._shippable_yml_lines

@ -10,13 +10,19 @@ import abc
from . import types as t from . import types as t
from .encoding import (
to_bytes,
)
from .io import (
read_text_file,
)
from .util import ( from .util import (
ApplicationError, ApplicationError,
display, display,
read_lines_without_comments, read_lines_without_comments,
is_subdir, is_subdir,
to_text,
to_bytes,
) )
from .data import ( from .data import (
@ -291,8 +297,7 @@ def load_integration_prefixes():
for file_path in file_paths: for file_path in file_paths:
prefix = os.path.splitext(file_path)[1][1:] prefix = os.path.splitext(file_path)[1][1:]
with open(file_path, 'r') as prefix_fd: prefixes.update(dict((k, prefix) for k in read_text_file(file_path).splitlines()))
prefixes.update(dict((k, prefix) for k in prefix_fd.read().splitlines()))
return prefixes return prefixes
@ -398,12 +403,11 @@ def analyze_integration_target_dependencies(integration_targets):
for meta_path in meta_paths: for meta_path in meta_paths:
if os.path.exists(meta_path): if os.path.exists(meta_path):
with open(meta_path, 'rb') as meta_fd: # try and decode the file as a utf-8 string, skip if it contains invalid chars (binary file)
# try and decode the file as a utf-8 string, skip if it contains invalid chars (binary file) try:
try: meta_lines = read_text_file(meta_path).splitlines()
meta_lines = to_text(meta_fd.read()).splitlines() except UnicodeDecodeError:
except UnicodeDecodeError: continue
continue
for meta_line in meta_lines: for meta_line in meta_lines:
if re.search(r'^ *#.*$', meta_line): if re.search(r'^ *#.*$', meta_line):

@ -3,7 +3,6 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
import datetime import datetime
import os
import re import re
from . import types as t from . import types as t

@ -2,20 +2,29 @@
from __future__ import (absolute_import, division, print_function) from __future__ import (absolute_import, division, print_function)
__metaclass__ = type __metaclass__ = type
TYPE_CHECKING = False
try: try:
from typing import ( from typing import (
Any, Any,
AnyStr, AnyStr,
BinaryIO,
Callable, Callable,
Dict, Dict,
FrozenSet, FrozenSet,
Generator,
IO,
Iterable, Iterable,
Iterator,
List, List,
Optional, Optional,
Pattern,
Set, Set,
Text, Text,
TextIO,
Tuple, Tuple,
Type, Type,
TYPE_CHECKING,
TypeVar, TypeVar,
Union, Union,
) )

@ -45,6 +45,17 @@ except ImportError:
from . import types as t from . import types as t
from .encoding import (
to_bytes,
to_optional_bytes,
to_optional_text,
)
from .io import (
open_binary_file,
read_text_file,
)
try: try:
C = t.TypeVar('C') C = t.TypeVar('C')
except AttributeError: except AttributeError:
@ -95,10 +106,6 @@ MODE_FILE_WRITE = MODE_FILE | stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
MODE_DIRECTORY = MODE_READ | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH MODE_DIRECTORY = MODE_READ | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
MODE_DIRECTORY_WRITE = MODE_DIRECTORY | stat.S_IWGRP | stat.S_IWOTH MODE_DIRECTORY_WRITE = MODE_DIRECTORY | stat.S_IWGRP | stat.S_IWOTH
ENCODING = 'utf-8'
Text = type(u'')
REMOTE_ONLY_PYTHON_VERSIONS = ( REMOTE_ONLY_PYTHON_VERSIONS = (
'2.6', '2.6',
) )
@ -113,38 +120,6 @@ SUPPORTED_PYTHON_VERSIONS = (
) )
def to_optional_bytes(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[bytes]
"""Return the given value as bytes encoded using UTF-8 if not already bytes, or None if the value is None."""
return None if value is None else to_bytes(value, errors)
def to_optional_text(value, errors='strict'): # type: (t.Optional[t.AnyStr], str) -> t.Optional[t.Text]
"""Return the given value as text decoded using UTF-8 if not already text, or None if the value is None."""
return None if value is None else to_text(value, errors)
def to_bytes(value, errors='strict'): # type: (t.AnyStr, str) -> bytes
"""Return the given value as bytes encoded using UTF-8 if not already bytes."""
if isinstance(value, bytes):
return value
if isinstance(value, Text):
return value.encode(ENCODING, errors)
raise Exception('value is not bytes or text: %s' % type(value))
def to_text(value, errors='strict'): # type: (t.AnyStr, str) -> t.Text
"""Return the given value as text decoded using UTF-8 if not already text."""
if isinstance(value, bytes):
return value.decode(ENCODING, errors)
if isinstance(value, Text):
return value
raise Exception('value is not bytes or text: %s' % type(value))
def get_docker_completion(): def get_docker_completion():
""" """
:rtype: dict[str, dict[str, str]] :rtype: dict[str, dict[str, str]]
@ -213,8 +188,7 @@ def read_lines_without_comments(path, remove_blank_lines=False, optional=False):
if optional and not os.path.exists(path): if optional and not os.path.exists(path):
return [] return []
with open(path, 'r') as path_fd: lines = read_text_file(path).splitlines()
lines = path_fd.read().splitlines()
lines = [re.sub(r' *#.*$', '', line) for line in lines] lines = [re.sub(r' *#.*$', '', line) for line in lines]
@ -521,17 +495,6 @@ def remove_tree(path):
raise raise
def make_dirs(path):
"""
:type path: str
"""
try:
os.makedirs(to_bytes(path))
except OSError as ex:
if ex.errno != errno.EEXIST:
raise
def is_binary_file(path): def is_binary_file(path):
""" """
:type path: str :type path: str
@ -587,7 +550,8 @@ def is_binary_file(path):
if ext in assume_binary: if ext in assume_binary:
return True return True
with open(path, 'rb') as path_fd: with open_binary_file(path) as path_fd:
# noinspection PyTypeChecker
return b'\0' in path_fd.read(1024) return b'\0' in path_fd.read(1024)
@ -658,11 +622,15 @@ class Display:
for warning in self.warnings: for warning in self.warnings:
self.__warning(warning) self.__warning(warning)
def warning(self, message, unique=False): def warning(self, message, unique=False, verbosity=0):
""" """
:type message: str :type message: str
:type unique: bool :type unique: bool
:type verbosity: int
""" """
if verbosity > self.verbosity:
return
if unique: if unique:
if message in self.warnings_unique: if message in self.warnings_unique:
return return
@ -839,11 +807,11 @@ def get_subclasses(class_type): # type: (t.Type[C]) -> t.Set[t.Type[C]]
def is_subdir(candidate_path, path): # type: (str, str) -> bool def is_subdir(candidate_path, path): # type: (str, str) -> bool
"""Returns true if candidate_path is path or a subdirectory of path.""" """Returns true if candidate_path is path or a subdirectory of path."""
if not path.endswith(os.sep): if not path.endswith(os.path.sep):
path += os.sep path += os.path.sep
if not candidate_path.endswith(os.sep): if not candidate_path.endswith(os.path.sep):
candidate_path += os.sep candidate_path += os.path.sep
return candidate_path.startswith(path) return candidate_path.startswith(path)
@ -874,10 +842,10 @@ def import_plugins(directory, root=None): # type: (str, t.Optional[str]) -> Non
path = os.path.join(root, directory) path = os.path.join(root, directory)
package = __name__.rsplit('.', 1)[0] package = __name__.rsplit('.', 1)[0]
prefix = '%s.%s.' % (package, directory.replace(os.sep, '.')) prefix = '%s.%s.' % (package, directory.replace(os.path.sep, '.'))
for (_module_loader, name, _ispkg) in pkgutil.iter_modules([path], prefix=prefix): for (_module_loader, name, _ispkg) in pkgutil.iter_modules([path], prefix=prefix):
module_path = os.path.join(root, name[len(package) + 1:].replace('.', os.sep) + '.py') module_path = os.path.join(root, name[len(package) + 1:].replace('.', os.path.sep) + '.py')
load_module(module_path, name) load_module(module_path, name)
@ -912,7 +880,8 @@ def load_module(path, name): # type: (str, str) -> None
# noinspection PyDeprecation # noinspection PyDeprecation
import imp import imp
with open(path, 'r') as module_file: # load_source (and thus load_module) require a file opened with `open` in text mode
with open(to_bytes(path)) as module_file:
# noinspection PyDeprecation # noinspection PyDeprecation
imp.load_module(name, module_file, path, ('.py', 'r', imp.PY_SOURCE)) imp.load_module(name, module_file, path, ('.py', 'r', imp.PY_SOURCE))

@ -4,7 +4,6 @@ __metaclass__ = type
import atexit import atexit
import contextlib import contextlib
import json
import os import os
import shutil import shutil
import sys import sys
@ -13,23 +12,29 @@ import textwrap
from . import types as t from . import types as t
from .encoding import (
to_bytes,
)
from .util import ( from .util import (
common_environment, common_environment,
COVERAGE_CONFIG_NAME, COVERAGE_CONFIG_NAME,
display, display,
find_python, find_python,
is_shippable,
remove_tree, remove_tree,
MODE_DIRECTORY, MODE_DIRECTORY,
MODE_FILE_EXECUTE, MODE_FILE_EXECUTE,
PYTHON_PATHS, PYTHON_PATHS,
raw_command, raw_command,
to_bytes,
ANSIBLE_TEST_DATA_ROOT, ANSIBLE_TEST_DATA_ROOT,
make_dirs,
ApplicationError, ApplicationError,
) )
from .io import (
write_text_file,
write_json_file,
)
from .data import ( from .data import (
data_context, data_context,
) )
@ -138,10 +143,10 @@ def named_temporary_file(args, prefix, suffix, directory, content):
yield tempfile_fd.name yield tempfile_fd.name
def write_json_test_results(category, name, content): # type: (ResultType, str, t.Union[t.List[t.Any], t.Dict[str, t.Any]]) -> None def write_json_test_results(category, name, content, formatted=True): # type: (ResultType, str, t.Union[t.List[t.Any], t.Dict[str, t.Any]], bool) -> None
"""Write the given json content to the specified test results path, creating directories as needed.""" """Write the given json content to the specified test results path, creating directories as needed."""
path = os.path.join(category.path, name) path = os.path.join(category.path, name)
write_json_file(path, content, create_directories=True) write_json_file(path, content, create_directories=True, formatted=formatted)
def write_text_test_results(category, name, content): # type: (ResultType, str, str) -> None def write_text_test_results(category, name, content): # type: (ResultType, str, str) -> None
@ -150,21 +155,6 @@ def write_text_test_results(category, name, content): # type: (ResultType, str,
write_text_file(path, content, create_directories=True) write_text_file(path, content, create_directories=True)
def write_json_file(path, content, create_directories=False): # type: (str, t.Union[t.List[t.Any], t.Dict[str, t.Any]], bool) -> None
"""Write the given json content to the specified path, optionally creating missing directories."""
text_content = json.dumps(content, sort_keys=True, indent=4) + '\n'
write_text_file(path, text_content, create_directories=create_directories)
def write_text_file(path, content, create_directories=False): # type: (str, str, bool) -> None
"""Write the given text content to the specified path, optionally creating missing directories."""
if create_directories:
make_dirs(os.path.dirname(path))
with open(to_bytes(path), 'wb') as file:
file.write(to_bytes(content))
def get_python_path(args, interpreter): def get_python_path(args, interpreter):
""" """
:type args: TestConfig :type args: TestConfig

Loading…
Cancel
Save