ansible-test - Fix subprocess management. (#77641)

* Run code-smell sanity tests in UTF-8 Mode.
* Update subprocess use in sanity test programs.
* Use raw_command instead of run_command with always=True set.
* Add more capture=True usage.
* Don't expose stdin to subprocesses.
* Capture more output. Warn on retry.
* Add more captures.
* Capture coverage cli output.
* Capture windows and network host checks.
* Be explicit about interactive usage.
* Use a shell for non-captured, non-interactive subprocesses.
* Add integration test to assert no TTY.
* Add unit test to assert no TTY.
* Require blocking stdin/stdout/stderr.
* Use subprocess.run in ansible-core sanity tests.
* Remove unused arg.
* Be explicit with subprocess.run check=False.
* Add changelog.
* Use a Python subprocess instead of a shell.
* Use InternalError instead of Exception.
* Require capture argument.
* Check for invalid raw_command arguments.
* Removed pointless communicate=True usage.
* Relocate stdout w/o capture check.
* Use threads instead of a subprocess for IO.
pull/77668/head
Matt Clay 2 years ago committed by GitHub
parent f6fb402d1f
commit 5c2d830dea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,10 @@
bugfixes:
- ansible-test - Subprocesses are now isolated from the stdin, stdout and stderr of ansible-test.
This avoids issues with subprocesses tampering with the file descriptors, such as SSH making them non-blocking.
As a result of this change, subprocess output from unit and integration tests on stderr now go to stdout.
- ansible-test - Subprocesses no longer have access to the TTY ansible-test is connected to, if any.
This maintains consistent behavior between local testing and CI systems, which typically do not provide a TTY.
Tests which require a TTY should use pexpect or another mechanism to create a PTY.
minor_changes:
- ansible-test - Blocking mode is now enforced for stdin, stdout and stderr.
If any of these are non-blocking then ansible-test will exit during startup with an error.

@ -0,0 +1,2 @@
context/controller
shippable/posix/group1

@ -0,0 +1,7 @@
#!/usr/bin/env python
import sys
assert not sys.stdin.isatty()
assert not sys.stdout.isatty()
assert not sys.stderr.isatty()

@ -0,0 +1,5 @@
#!/usr/bin/env bash
set -eux
./runme.py

@ -22,11 +22,11 @@ from .util import (
ANSIBLE_SOURCE_ROOT,
ANSIBLE_TEST_TOOLS_ROOT,
get_ansible_version,
raw_command,
)
from .util_common import (
create_temp_dir,
run_command,
ResultType,
intercept_python,
get_injector_path,
@ -258,12 +258,12 @@ class CollectionDetailError(ApplicationError):
self.reason = reason
def get_collection_detail(args, python): # type: (EnvironmentConfig, PythonConfig) -> CollectionDetail
def get_collection_detail(python): # type: (PythonConfig) -> CollectionDetail
"""Return collection detail."""
collection = data_context().content.collection
directory = os.path.join(collection.root, collection.directory)
stdout = run_command(args, [python.path, os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'collection_detail.py'), directory], capture=True, always=True)[0]
stdout = raw_command([python.path, os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'collection_detail.py'), directory], capture=True)[0]
result = json.loads(stdout)
error = result.get('error')
@ -282,15 +282,15 @@ def run_playbook(
args, # type: EnvironmentConfig
inventory_path, # type: str
playbook, # type: str
run_playbook_vars=None, # type: t.Optional[t.Dict[str, t.Any]]
capture=False, # type: bool
capture, # type: bool
variables=None, # type: t.Optional[t.Dict[str, t.Any]]
): # type: (...) -> None
"""Run the specified playbook using the given inventory file and playbook variables."""
playbook_path = os.path.join(ANSIBLE_TEST_DATA_ROOT, 'playbooks', playbook)
cmd = ['ansible-playbook', '-i', inventory_path, playbook_path]
if run_playbook_vars:
cmd.extend(['-e', json.dumps(run_playbook_vars)])
if variables:
cmd.extend(['-e', json.dumps(variables)])
if args.verbosity:
cmd.append('-%s' % ('v' * args.verbosity))

@ -100,7 +100,16 @@ def run_coverage(args, host_state, output_file, command, cmd): # type: (Coverag
cmd = ['python', '-m', 'coverage.__main__', command, '--rcfile', COVERAGE_CONFIG_PATH] + cmd
intercept_python(args, host_state.controller_profile.python, cmd, env)
stdout, stderr = intercept_python(args, host_state.controller_profile.python, cmd, env, capture=True)
stdout = (stdout or '').strip()
stderr = (stderr or '').strip()
if stdout:
display.info(stdout)
if stderr:
display.warning(stderr)
def get_all_coverage_files(): # type: () -> t.List[str]

@ -18,11 +18,11 @@ from ...util import (
ANSIBLE_TEST_TOOLS_ROOT,
display,
ApplicationError,
raw_command,
)
from ...util_common import (
ResultType,
run_command,
write_json_file,
write_json_test_results,
)
@ -196,7 +196,7 @@ def _command_coverage_combine_powershell(args): # type: (CoverageCombineConfig)
cmd = ['pwsh', os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'coverage_stub.ps1')]
cmd.extend(source_paths)
stubs = json.loads(run_command(args, cmd, capture=True, always=True)[0])
stubs = json.loads(raw_command(cmd, capture=True)[0])
return dict((d['Path'], dict((line, 0) for line in d['Lines'])) for d in stubs)

@ -619,7 +619,7 @@ def command_integration_script(
cmd += ['-e', '@%s' % config_path]
env.update(coverage_manager.get_environment(target.name, target.aliases))
cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd)
cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False)
def command_integration_role(
@ -738,7 +738,7 @@ def command_integration_role(
env['ANSIBLE_ROLES_PATH'] = test_env.targets_dir
env.update(coverage_manager.get_environment(target.name, target.aliases))
cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd)
cover_python(args, host_state.controller_profile.python, cmd, target.name, env, cwd=cwd, capture=False)
def run_setup_targets(

@ -106,7 +106,7 @@ class CsCloudProvider(CloudProvider):
# apply work-around for OverlayFS issue
# https://github.com/docker/for-linux/issues/72#issuecomment-319904698
docker_exec(self.args, self.DOCKER_SIMULATOR_NAME, ['find', '/var/lib/mysql', '-type', 'f', '-exec', 'touch', '{}', ';'])
docker_exec(self.args, self.DOCKER_SIMULATOR_NAME, ['find', '/var/lib/mysql', '-type', 'f', '-exec', 'touch', '{}', ';'], capture=True)
if self.args.explain:
values = dict(

@ -118,7 +118,7 @@ class CoverageHandler(t.Generic[THostConfig], metaclass=abc.ABCMeta):
def run_playbook(self, playbook, variables): # type: (str, t.Dict[str, str]) -> None
"""Run the specified playbook using the current inventory."""
self.create_inventory()
run_playbook(self.args, self.inventory_path, playbook, variables)
run_playbook(self.args, self.inventory_path, playbook, capture=False, variables=variables)
class PosixCoverageHandler(CoverageHandler[PosixConfig]):

@ -952,6 +952,7 @@ class SanityCodeSmellTest(SanitySingleVersion):
cmd = [python.path, self.path]
env = ansible_environment(args, color=False)
env.update(PYTHONUTF8='1') # force all code-smell sanity tests to run with Python UTF-8 Mode enabled
pattern = None
data = None

@ -141,7 +141,7 @@ class PylintTest(SanitySingleVersion):
if data_context().content.collection:
try:
collection_detail = get_collection_detail(args, python)
collection_detail = get_collection_detail(python)
if not collection_detail.version:
display.warning('Skipping pylint collection version checks since no collection version was found.')

@ -121,7 +121,7 @@ class ValidateModulesTest(SanitySingleVersion):
cmd.extend(['--collection', data_context().content.collection.directory])
try:
collection_detail = get_collection_detail(args, python)
collection_detail = get_collection_detail(python)
if collection_detail.version:
cmd.extend(['--collection-version', collection_detail.version])

@ -2,6 +2,7 @@
from __future__ import annotations
import os
import sys
import typing as t
from ...util import (
@ -44,6 +45,9 @@ def command_shell(args): # type: (ShellConfig) -> None
if args.raw and isinstance(args.targets[0], ControllerConfig):
raise ApplicationError('The --raw option has no effect on the controller.')
if not sys.stdin.isatty():
raise ApplicationError('Standard input must be a TTY to launch a shell.')
host_state = prepare_profiles(args, skip_setup=args.raw) # shell
if args.delegate:
@ -87,4 +91,4 @@ def command_shell(args): # type: (ShellConfig) -> None
else:
cmd = []
con.run(cmd)
con.run(cmd, capture=False, interactive=True)

@ -275,7 +275,7 @@ def command_units(args): # type: (UnitsConfig) -> None
display.info('Unit test %s with Python %s' % (test_context, python.version))
try:
cover_python(args, python, cmd, test_context, env)
cover_python(args, python, cmd, test_context, env, capture=False)
except SubprocessError as ex:
# pytest exits with status code 5 when all tests are skipped, which isn't an error for our use case
if ex.status != 5:

@ -238,6 +238,7 @@ class ShellConfig(EnvironmentConfig):
super().__init__(args, 'shell')
self.raw = args.raw # type: bool
self.interactive = True
class SanityConfig(TestConfig):

@ -3,7 +3,6 @@ from __future__ import annotations
import abc
import shlex
import sys
import tempfile
import typing as t
@ -46,7 +45,8 @@ class Connection(metaclass=abc.ABCMeta):
@abc.abstractmethod
def run(self,
command, # type: t.List[str]
capture=False, # type: bool
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
@ -60,7 +60,7 @@ class Connection(metaclass=abc.ABCMeta):
"""Extract the given archive file stream in the specified directory."""
tar_cmd = ['tar', 'oxzf', '-', '-C', chdir]
retry(lambda: self.run(tar_cmd, stdin=src))
retry(lambda: self.run(tar_cmd, stdin=src, capture=True))
def create_archive(self,
chdir, # type: str
@ -82,7 +82,7 @@ class Connection(metaclass=abc.ABCMeta):
sh_cmd = ['sh', '-c', ' | '.join(' '.join(shlex.quote(cmd) for cmd in command) for command in commands)]
retry(lambda: self.run(sh_cmd, stdout=dst))
retry(lambda: self.run(sh_cmd, stdout=dst, capture=True))
class LocalConnection(Connection):
@ -92,7 +92,8 @@ class LocalConnection(Connection):
def run(self,
command, # type: t.List[str]
capture=False, # type: bool
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
@ -105,6 +106,7 @@ class LocalConnection(Connection):
data=data,
stdin=stdin,
stdout=stdout,
interactive=interactive,
)
@ -130,7 +132,8 @@ class SshConnection(Connection):
def run(self,
command, # type: t.List[str]
capture=False, # type: bool
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
@ -143,7 +146,7 @@ class SshConnection(Connection):
options.append('-q')
if not data and not stdin and not stdout and sys.stdin.isatty():
if interactive:
options.append('-tt')
with tempfile.NamedTemporaryFile(prefix='ansible-test-ssh-debug-', suffix='.log') as ssh_logfile:
@ -166,6 +169,7 @@ class SshConnection(Connection):
data=data,
stdin=stdin,
stdout=stdout,
interactive=interactive,
error_callback=error_callback,
)
@ -208,7 +212,8 @@ class DockerConnection(Connection):
def run(self,
command, # type: t.List[str]
capture=False, # type: bool
capture, # type: bool
interactive=False, # type: bool
data=None, # type: t.Optional[str]
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
@ -219,7 +224,7 @@ class DockerConnection(Connection):
if self.user:
options.extend(['--user', self.user])
if not data and not stdin and not stdout and sys.stdin.isatty():
if interactive:
options.append('-it')
return docker_exec(
@ -231,6 +236,7 @@ class DockerConnection(Connection):
data=data,
stdin=stdin,
stdout=stdout,
interactive=interactive,
)
def inspect(self): # type: () -> DockerInspect

@ -794,7 +794,7 @@ def forward_ssh_ports(
inventory = generate_ssh_inventory(ssh_connections)
with named_temporary_file(args, 'ssh-inventory-', '.json', None, inventory) as inventory_path: # type: str
run_playbook(args, inventory_path, playbook, dict(hosts_entries=hosts_entries))
run_playbook(args, inventory_path, playbook, capture=False, variables=dict(hosts_entries=hosts_entries))
ssh_processes = [] # type: t.List[SshProcess]
@ -827,7 +827,7 @@ def cleanup_ssh_ports(
inventory = generate_ssh_inventory(ssh_connections)
with named_temporary_file(args, 'ssh-inventory-', '.json', None, inventory) as inventory_path: # type: str
run_playbook(args, inventory_path, playbook, dict(hosts_entries=hosts_entries))
run_playbook(args, inventory_path, playbook, capture=False, variables=dict(hosts_entries=hosts_entries))
if ssh_processes:
for process in ssh_processes:

@ -469,7 +469,7 @@ class SshKey:
make_dirs(os.path.dirname(key))
if not os.path.isfile(key) or not os.path.isfile(pub):
run_command(args, ['ssh-keygen', '-m', 'PEM', '-q', '-t', self.KEY_TYPE, '-N', '', '-f', key])
run_command(args, ['ssh-keygen', '-m', 'PEM', '-q', '-t', self.KEY_TYPE, '-N', '', '-f', key], capture=True)
if args.explain:
return key, pub

@ -141,7 +141,7 @@ def cover_python(
cmd, # type: t.List[str]
target_name, # type: str
env, # type: t.Dict[str, str]
capture=False, # type: bool
capture, # type: bool
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]

@ -160,12 +160,13 @@ def delegate_command(args, host_state, exclude, require): # type: (EnvironmentC
os.path.join(content_root, ResultType.COVERAGE.relative_path),
]
con.run(['mkdir', '-p'] + writable_dirs)
con.run(['chmod', '777'] + writable_dirs)
con.run(['chmod', '755', working_directory])
con.run(['chmod', '644', os.path.join(content_root, args.metadata_path)])
con.run(['useradd', pytest_user, '--create-home'])
con.run(insert_options(command, options + ['--requirements-mode', 'only']))
con.run(['mkdir', '-p'] + writable_dirs, capture=True)
con.run(['chmod', '777'] + writable_dirs, capture=True)
con.run(['chmod', '755', working_directory], capture=True)
con.run(['chmod', '644', os.path.join(content_root, args.metadata_path)], capture=True)
con.run(['useradd', pytest_user, '--create-home'], capture=True)
con.run(insert_options(command, options + ['--requirements-mode', 'only']), capture=False)
container = con.inspect()
networks = container.get_network_names()
@ -191,7 +192,7 @@ def delegate_command(args, host_state, exclude, require): # type: (EnvironmentC
success = False
try:
con.run(insert_options(command, options))
con.run(insert_options(command, options), capture=False, interactive=args.interactive)
success = True
finally:
if host_delegation:

@ -268,7 +268,7 @@ def docker_pull(args, image): # type: (EnvironmentConfig, str) -> None
for _iteration in range(1, 10):
try:
docker_command(args, ['pull', image])
docker_command(args, ['pull', image], capture=False)
return
except SubprocessError:
display.warning('Failed to pull docker image "%s". Waiting a few seconds before trying again.' % image)
@ -279,7 +279,7 @@ def docker_pull(args, image): # type: (EnvironmentConfig, str) -> None
def docker_cp_to(args, container_id, src, dst): # type: (EnvironmentConfig, str, str, str) -> None
"""Copy a file to the specified container."""
docker_command(args, ['cp', src, '%s:%s' % (container_id, dst)])
docker_command(args, ['cp', src, '%s:%s' % (container_id, dst)], capture=True)
def docker_run(
@ -510,10 +510,11 @@ def docker_exec(
args, # type: EnvironmentConfig
container_id, # type: str
cmd, # type: t.List[str]
capture, # type: bool
options=None, # type: t.Optional[t.List[str]]
capture=False, # type: bool
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
interactive=False, # type: bool
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Execute the given command in the specified container."""
@ -523,7 +524,7 @@ def docker_exec(
if data or stdin or stdout:
options.append('-i')
return docker_command(args, ['exec'] + options + [container_id] + cmd, capture=capture, stdin=stdin, stdout=stdout, data=data)
return docker_command(args, ['exec'] + options + [container_id] + cmd, capture=capture, stdin=stdin, stdout=stdout, interactive=interactive, data=data)
def docker_info(args): # type: (CommonConfig) -> t.Dict[str, t.Any]
@ -541,9 +542,10 @@ def docker_version(args): # type: (CommonConfig) -> t.Dict[str, t.Any]
def docker_command(
args, # type: CommonConfig
cmd, # type: t.List[str]
capture=False, # type: bool
capture, # type: bool
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
interactive=False, # type: bool
always=False, # type: bool
data=None, # type: t.Optional[str]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
@ -552,7 +554,7 @@ def docker_command(
command = [require_docker().command]
if command[0] == 'podman' and _get_podman_remote():
command.append('--remote')
return run_command(args, command + cmd, env=env, capture=capture, stdin=stdin, stdout=stdout, always=always, data=data)
return run_command(args, command + cmd, env=env, capture=capture, stdin=stdin, stdout=stdout, interactive=interactive, always=always, data=data)
def docker_environment(): # type: () -> t.Dict[str, str]

@ -362,7 +362,7 @@ class DockerProfile(ControllerHostProfile[DockerConfig], SshTargetHostProfile[Do
setup_sh = bootstrapper.get_script()
shell = setup_sh.splitlines()[0][2:]
docker_exec(self.args, self.container_name, [shell], data=setup_sh)
docker_exec(self.args, self.container_name, [shell], data=setup_sh, capture=False)
def deprovision(self): # type: () -> None
"""Deprovision the host after delegation has completed."""
@ -484,8 +484,9 @@ class NetworkRemoteProfile(RemoteProfile[NetworkRemoteConfig]):
for dummy in range(1, 90):
try:
intercept_python(self.args, self.args.controller_python, cmd, env)
except SubprocessError:
intercept_python(self.args, self.args.controller_python, cmd, env, capture=True)
except SubprocessError as ex:
display.warning(str(ex))
time.sleep(10)
else:
return
@ -547,7 +548,7 @@ class PosixRemoteProfile(ControllerHostProfile[PosixRemoteConfig], RemoteProfile
shell = setup_sh.splitlines()[0][2:]
ssh = self.get_origin_controller_connection()
ssh.run([shell], data=setup_sh)
ssh.run([shell], data=setup_sh, capture=False)
def get_ssh_connection(self): # type: () -> SshConnection
"""Return an SSH connection for accessing the host."""
@ -717,8 +718,9 @@ class WindowsRemoteProfile(RemoteProfile[WindowsRemoteConfig]):
for dummy in range(1, 120):
try:
intercept_python(self.args, self.args.controller_python, cmd, env)
except SubprocessError:
intercept_python(self.args, self.args.controller_python, cmd, env, capture=True)
except SubprocessError as ex:
display.warning(str(ex))
time.sleep(10)
else:
return

@ -126,7 +126,8 @@ def configure_target_pypi_proxy(args, profile, pypi_endpoint, pypi_hostname): #
force = 'yes' if profile.config.is_managed else 'no'
run_playbook(args, inventory_path, 'pypi_proxy_prepare.yml', dict(pypi_endpoint=pypi_endpoint, pypi_hostname=pypi_hostname, force=force), capture=True)
run_playbook(args, inventory_path, 'pypi_proxy_prepare.yml', capture=True, variables=dict(
pypi_endpoint=pypi_endpoint, pypi_hostname=pypi_hostname, force=force))
atexit.register(cleanup_pypi_proxy)

@ -261,7 +261,7 @@ def run_pip(
if not args.explain:
try:
connection.run([python.path], data=script)
connection.run([python.path], data=script, capture=False)
except SubprocessError:
script = prepare_pip_script([PipVersion()])

@ -1,6 +1,7 @@
"""Miscellaneous utility functions and classes."""
from __future__ import annotations
import abc
import errno
import fcntl
import importlib.util
@ -41,6 +42,7 @@ from .io import (
from .thread import (
mutex,
WrappedThread,
)
from .constants import (
@ -254,18 +256,37 @@ def get_available_python_versions(): # type: () -> t.Dict[str, str]
def raw_command(
cmd, # type: t.Iterable[str]
capture=False, # type: bool
capture, # type: bool
env=None, # type: t.Optional[t.Dict[str, str]]
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
explain=False, # type: bool
stdin=None, # type: t.Optional[t.Union[t.IO[bytes], int]]
stdout=None, # type: t.Optional[t.Union[t.IO[bytes], int]]
interactive=False, # type: bool
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Run the specified command and return stdout and stderr as a tuple."""
if capture and interactive:
raise InternalError('Cannot combine capture=True with interactive=True.')
if data and interactive:
raise InternalError('Cannot combine data with interactive=True.')
if stdin and interactive:
raise InternalError('Cannot combine stdin with interactive=True.')
if stdout and interactive:
raise InternalError('Cannot combine stdout with interactive=True.')
if stdin and data:
raise InternalError('Cannot combine stdin with data.')
if stdout and not capture:
raise InternalError('Redirection of stdout requires capture=True to avoid redirection of stderr to stdout.')
if not cwd:
cwd = os.getcwd()
@ -276,7 +297,30 @@ def raw_command(
escaped_cmd = ' '.join(shlex.quote(c) for c in cmd)
display.info('Run command: %s' % escaped_cmd, verbosity=cmd_verbosity, truncate=True)
if capture:
description = 'Run'
elif interactive:
description = 'Interactive'
else:
description = 'Stream'
description += ' command'
with_types = []
if data:
with_types.append('data')
if stdin:
with_types.append('stdin')
if stdout:
with_types.append('stdout')
if with_types:
description += f' with {"/".join(with_types)}'
display.info(f'{description}: {escaped_cmd}', verbosity=cmd_verbosity, truncate=True)
display.info('Working directory: %s' % cwd, verbosity=2)
program = find_executable(cmd[0], cwd=cwd, path=env['PATH'], required='warning')
@ -294,17 +338,23 @@ def raw_command(
if stdin is not None:
data = None
communicate = True
elif data is not None:
stdin = subprocess.PIPE
communicate = True
if stdout:
communicate = True
if capture:
elif interactive:
pass # allow the subprocess access to our stdin
else:
stdin = subprocess.DEVNULL
if not interactive:
# When not running interactively, send subprocess stdout/stderr through a pipe.
# This isolates the stdout/stderr of the subprocess from the current process, and also hides the current TTY from it, if any.
# This prevents subprocesses from sharing stdout/stderr with the current process or each other.
# Doing so allows subprocesses to safely make changes to their file handles, such as making them non-blocking (ssh does this).
# This also maintains consistency between local testing and CI systems, which typically do not provide a TTY.
# To maintain output ordering, a single pipe is used for both stdout/stderr when not capturing output.
stdout = stdout or subprocess.PIPE
stderr = subprocess.PIPE
stderr = subprocess.PIPE if capture else subprocess.STDOUT
communicate = True
else:
stderr = None
@ -324,7 +374,7 @@ def raw_command(
if communicate:
data_bytes = to_optional_bytes(data)
stdout_bytes, stderr_bytes = process.communicate(data_bytes)
stdout_bytes, stderr_bytes = communicate_with_process(process, data_bytes, stdout == subprocess.PIPE, stderr == subprocess.PIPE, capture=capture)
stdout_text = to_optional_text(stdout_bytes, str_errors) or u''
stderr_text = to_optional_text(stderr_bytes, str_errors) or u''
else:
@ -347,6 +397,114 @@ def raw_command(
raise SubprocessError(cmd, status, stdout_text, stderr_text, runtime, error_callback)
def communicate_with_process(process: subprocess.Popen, stdin: t.Optional[bytes], stdout: bool, stderr: bool, capture: bool) -> t.Tuple[bytes, bytes]:
"""Communicate with the specified process, handling stdin/stdout/stderr as requested."""
threads: t.List[WrappedThread] = []
reader: t.Type[ReaderThread]
if capture:
reader = CaptureThread
else:
reader = OutputThread
if stdin is not None:
threads.append(WriterThread(process.stdin, stdin))
if stdout:
stdout_reader = reader(process.stdout)
threads.append(stdout_reader)
else:
stdout_reader = None
if stderr:
stderr_reader = reader(process.stderr)
threads.append(stderr_reader)
else:
stderr_reader = None
for thread in threads:
thread.start()
for thread in threads:
try:
thread.wait_for_result()
except Exception as ex: # pylint: disable=broad-except
display.error(str(ex))
if isinstance(stdout_reader, ReaderThread):
stdout_bytes = b''.join(stdout_reader.lines)
else:
stdout_bytes = b''
if isinstance(stderr_reader, ReaderThread):
stderr_bytes = b''.join(stderr_reader.lines)
else:
stderr_bytes = b''
process.wait()
return stdout_bytes, stderr_bytes
class WriterThread(WrappedThread):
"""Thread to write data to stdin of a subprocess."""
def __init__(self, handle: t.IO[bytes], data: bytes) -> None:
super().__init__(self._run)
self.handle = handle
self.data = data
def _run(self) -> None:
"""Workload to run on a thread."""
try:
self.handle.write(self.data)
self.handle.flush()
finally:
self.handle.close()
class ReaderThread(WrappedThread, metaclass=abc.ABCMeta):
"""Thread to read stdout from a subprocess."""
def __init__(self, handle: t.IO[bytes]) -> None:
super().__init__(self._run)
self.handle = handle
self.lines = [] # type: t.List[bytes]
@abc.abstractmethod
def _run(self) -> None:
"""Workload to run on a thread."""
class CaptureThread(ReaderThread):
"""Thread to capture stdout from a subprocess into a buffer."""
def _run(self) -> None:
"""Workload to run on a thread."""
src = self.handle
dst = self.lines
try:
for line in src:
dst.append(line)
finally:
src.close()
class OutputThread(ReaderThread):
"""Thread to pass stdout from a subprocess to stdout."""
def _run(self) -> None:
"""Workload to run on a thread."""
src = self.handle
dst = sys.stdout.buffer
try:
for line in src:
dst.write(line)
dst.flush()
finally:
src.close()
def common_environment():
"""Common environment used for executing all programs."""
env = dict(
@ -654,12 +812,15 @@ class MissingEnvironmentVariable(ApplicationError):
self.name = name
def retry(func, ex_type=SubprocessError, sleep=10, attempts=10):
def retry(func, ex_type=SubprocessError, sleep=10, attempts=10, warn=True):
"""Retry the specified function on failure."""
for dummy in range(1, attempts):
try:
return func()
except ex_type:
except ex_type as ex:
if warn:
display.warning(str(ex))
time.sleep(sleep)
return func()

@ -126,6 +126,7 @@ class CommonConfig:
"""Configuration common to all commands."""
def __init__(self, args, command): # type: (t.Any, str) -> None
self.command = command
self.interactive = False
self.success = None # type: t.Optional[bool]
self.color = args.color # type: bool
@ -369,7 +370,7 @@ def intercept_python(
python, # type: PythonConfig
cmd, # type: t.List[str]
env, # type: t.Dict[str, str]
capture=False, # type: bool
capture, # type: bool
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
always=False, # type: bool
@ -399,20 +400,21 @@ def intercept_python(
def run_command(
args, # type: CommonConfig
cmd, # type: t.Iterable[str]
capture=False, # type: bool
capture, # type: bool
env=None, # type: t.Optional[t.Dict[str, str]]
data=None, # type: t.Optional[str]
cwd=None, # type: t.Optional[str]
always=False, # type: bool
stdin=None, # type: t.Optional[t.IO[bytes]]
stdout=None, # type: t.Optional[t.IO[bytes]]
interactive=False, # type: bool
cmd_verbosity=1, # type: int
str_errors='strict', # type: str
error_callback=None, # type: t.Optional[t.Callable[[SubprocessError], None]]
): # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
"""Run the specified command and return stdout and stderr as a tuple."""
explain = args.explain and not always
return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout,
return raw_command(cmd, capture=capture, env=env, data=data, cwd=cwd, explain=explain, stdin=stdin, stdout=stdout, interactive=interactive,
cmd_verbosity=cmd_verbosity, str_errors=str_errors, error_callback=error_callback)

@ -20,6 +20,7 @@ from .util import (
remove_tree,
ApplicationError,
str_to_version,
raw_command,
)
from .util_common import (
@ -92,7 +93,7 @@ def create_virtual_environment(args, # type: EnvironmentConfig
# creating a virtual environment using 'venv' when running in a virtual environment created by 'virtualenv' results
# in a copy of the original virtual environment instead of creation of a new one
# avoid this issue by only using "real" python interpreters to invoke 'venv'
for real_python in iterate_real_pythons(args, python.version):
for real_python in iterate_real_pythons(python.version):
if run_venv(args, real_python, system_site_packages, pip, path):
display.info('Created Python %s virtual environment using "venv": %s' % (python.version, path), verbosity=1)
return True
@ -128,7 +129,7 @@ def create_virtual_environment(args, # type: EnvironmentConfig
return False
def iterate_real_pythons(args, version): # type: (EnvironmentConfig, str) -> t.Iterable[str]
def iterate_real_pythons(version): # type: (str) -> t.Iterable[str]
"""
Iterate through available real python interpreters of the requested version.
The current interpreter will be checked and then the path will be searched.
@ -138,7 +139,7 @@ def iterate_real_pythons(args, version): # type: (EnvironmentConfig, str) -> t.
if version_info == sys.version_info[:len(version_info)]:
current_python = sys.executable
real_prefix = get_python_real_prefix(args, current_python)
real_prefix = get_python_real_prefix(current_python)
if real_prefix:
current_python = find_python(version, os.path.join(real_prefix, 'bin'))
@ -159,7 +160,7 @@ def iterate_real_pythons(args, version): # type: (EnvironmentConfig, str) -> t.
if found_python == current_python:
return
real_prefix = get_python_real_prefix(args, found_python)
real_prefix = get_python_real_prefix(found_python)
if real_prefix:
found_python = find_python(version, os.path.join(real_prefix, 'bin'))
@ -168,12 +169,12 @@ def iterate_real_pythons(args, version): # type: (EnvironmentConfig, str) -> t.
yield found_python
def get_python_real_prefix(args, python_path): # type: (EnvironmentConfig, str) -> t.Optional[str]
def get_python_real_prefix(python_path): # type: (str) -> t.Optional[str]
"""
Return the real prefix of the specified interpreter or None if the interpreter is not a virtual environment created by 'virtualenv'.
"""
cmd = [python_path, os.path.join(os.path.join(ANSIBLE_TEST_TARGET_TOOLS_ROOT, 'virtualenvcheck.py'))]
check_result = json.loads(run_command(args, cmd, capture=True, always=True)[0])
check_result = json.loads(raw_command(cmd, capture=True)[0])
real_prefix = check_result['real_prefix']
return real_prefix

@ -47,7 +47,11 @@ def main():
env = os.environ.copy()
env.update(PYTHONPATH='%s:%s' % (os.path.join(os.path.dirname(__file__), 'changelog'), env['PYTHONPATH']))
subprocess.call(cmd, env=env) # ignore the return code, rely on the output instead
# ignore the return code, rely on the output instead
process = subprocess.run(cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, env=env, check=False)
sys.stdout.write(process.stdout)
sys.stderr.write(process.stderr)
if __name__ == '__main__':

@ -437,14 +437,13 @@ class ModuleValidator(Validator):
base_path = self._get_base_branch_module_path()
command = ['git', 'show', '%s:%s' % (self.base_branch, base_path or self.path)]
p = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
p = subprocess.run(command, stdin=subprocess.DEVNULL, capture_output=True, check=False)
if int(p.returncode) != 0:
return None
t = tempfile.NamedTemporaryFile(delete=False)
t.write(stdout)
t.write(p.stdout)
t.close()
return t.name
@ -2457,11 +2456,12 @@ class GitCache:
@staticmethod
def _git(args):
cmd = ['git'] + args
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
p = subprocess.run(cmd, stdin=subprocess.DEVNULL, capture_output=True, text=True, check=False)
if p.returncode != 0:
raise GitError(stderr, p.returncode)
return stdout.decode('utf-8').splitlines()
raise GitError(p.stderr, p.returncode)
return p.stdout.splitlines()
class GitError(Exception):

@ -122,14 +122,12 @@ def get_ps_argument_spec(filename, collection):
})
script_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'ps_argspec.ps1')
proc = subprocess.Popen(['pwsh', script_path, util_manifest], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=False)
stdout, stderr = proc.communicate()
proc = subprocess.run(['pwsh', script_path, util_manifest], stdin=subprocess.DEVNULL, capture_output=True, text=True, check=False)
if proc.returncode != 0:
raise AnsibleModuleImportError("STDOUT:\n%s\nSTDERR:\n%s" % (stdout.decode('utf-8'), stderr.decode('utf-8')))
raise AnsibleModuleImportError("STDOUT:\n%s\nSTDERR:\n%s" % (proc.stdout, proc.stderr))
kwargs = json.loads(stdout)
kwargs = json.loads(proc.stdout)
# the validate-modules code expects the options spec to be under the argument_spec key not options as set in PS
kwargs['argument_spec'] = kwargs.pop('options', {})

@ -27,6 +27,9 @@ def main(args=None):
raise SystemExit('This version of ansible-test cannot be executed with Python version %s. Supported Python versions are: %s' % (
version_to_str(sys.version_info[:3]), ', '.join(CONTROLLER_PYTHON_VERSIONS)))
if any(not os.get_blocking(handle.fileno()) for handle in (sys.stdin, sys.stdout, sys.stderr)):
raise SystemExit('Standard input, output and error file handles must be blocking to run ansible-test.')
# noinspection PyProtectedMember
from ansible_test._internal import main as cli_main

@ -29,13 +29,12 @@ def main():
try:
cmd = ['make', 'core_singlehtmldocs']
sphinx = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=docs_dir)
stdout, stderr = sphinx.communicate()
sphinx = subprocess.run(cmd, stdin=subprocess.DEVNULL, capture_output=True, cwd=docs_dir, check=False, text=True)
finally:
shutil.move(tmp, requirements_txt)
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
stdout = sphinx.stdout
stderr = sphinx.stderr
if sphinx.returncode != 0:
sys.stderr.write("Command '%s' failed with status code: %d\n" % (' '.join(cmd), sphinx.returncode))

@ -172,14 +172,15 @@ def clean_repository(file_list):
def create_sdist(tmp_dir):
"""Create an sdist in the repository"""
create = subprocess.Popen(
create = subprocess.run(
['make', 'snapshot', 'SDIST_DIR=%s' % tmp_dir],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
check=False,
)
stderr = create.communicate()[1]
stderr = create.stderr
if create.returncode != 0:
raise Exception('make snapshot failed:\n%s' % stderr)
@ -220,15 +221,16 @@ def extract_sdist(sdist_path, tmp_dir):
def install_sdist(tmp_dir, sdist_dir):
"""Install the extracted sdist into the temporary directory"""
install = subprocess.Popen(
install = subprocess.run(
['python', 'setup.py', 'install', '--root=%s' % tmp_dir],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
stdin=subprocess.DEVNULL,
capture_output=True,
text=True,
cwd=os.path.join(tmp_dir, sdist_dir),
check=False,
)
stdout, stderr = install.communicate()
stdout, stderr = install.stdout, install.stderr
if install.returncode != 0:
raise Exception('sdist install failed:\n%s' % stderr)

@ -0,0 +1,7 @@
import sys
def test_no_tty():
assert not sys.stdin.isatty()
assert not sys.stdout.isatty()
assert not sys.stderr.isatty()
Loading…
Cancel
Save