@ -1,12 +1,15 @@
""" Miscellaneous utility functions and classes. """
from __future__ import annotations
import abc
import errno
import fcntl
import importlib . util
import inspect
import json
import keyword
import os
import platform
import pkgutil
import random
import re
@ -41,6 +44,7 @@ from .io import (
from . thread import (
mutex ,
WrappedThread ,
)
from . constants import (
@ -96,6 +100,18 @@ MODE_DIRECTORY = MODE_READ | stat.S_IWUSR | stat.S_IXUSR | stat.S_IXGRP | stat.S
MODE_DIRECTORY_WRITE = MODE_DIRECTORY | stat . S_IWGRP | stat . S_IWOTH
class Architecture :
"""
Normalized architecture names .
These are the architectures supported by ansible - test , such as when provisioning remote instances .
"""
X86_64 = ' x86_64 '
AARCH64 = ' aarch64 '
REMOTE_ARCHITECTURES = list ( value for key , value in Architecture . __dict__ . items ( ) if not key . startswith ( ' __ ' ) )
def is_valid_identifier ( value : str ) - > bool :
""" Return True if the given value is a valid non-keyword Python identifier, otherwise return False. """
return value . isidentifier ( ) and not keyword . iskeyword ( value )
@ -119,6 +135,58 @@ def cache(func): # type: (t.Callable[[], TValue]) -> t.Callable[[], TValue]
return wrapper
@mutex
def detect_architecture ( python : str ) - > t . Optional [ str ] :
""" Detect the architecture of the specified Python and return a normalized version, or None if it cannot be determined. """
results : t . Dict [ str , t . Optional [ str ] ]
try :
results = detect_architecture . results # type: ignore[attr-defined]
except AttributeError :
results = detect_architecture . results = { } # type: ignore[attr-defined]
if python in results :
return results [ python ]
if python == sys . executable or os . path . realpath ( python ) == os . path . realpath ( sys . executable ) :
uname = platform . uname ( )
else :
data = raw_command ( [ python , ' -c ' , ' import json, platform; print(json.dumps(platform.uname())); ' ] , capture = True ) [ 0 ]
uname = json . loads ( data )
translation = {
' x86_64 ' : Architecture . X86_64 , # Linux, macOS
' amd64 ' : Architecture . X86_64 , # FreeBSD
' aarch64 ' : Architecture . AARCH64 , # Linux, FreeBSD
' arm64 ' : Architecture . AARCH64 , # FreeBSD
}
candidates = [ ]
if len ( uname ) > = 5 :
candidates . append ( uname [ 4 ] )
if len ( uname ) > = 6 :
candidates . append ( uname [ 5 ] )
candidates = sorted ( set ( candidates ) )
architectures = sorted ( set ( arch for arch in [ translation . get ( candidate ) for candidate in candidates ] if arch ) )
architecture : t . Optional [ str ] = None
if not architectures :
display . warning ( f ' Unable to determine architecture for Python interpreter " { python } " from: { candidates } ' )
elif len ( architectures ) == 1 :
architecture = architectures [ 0 ]
display . info ( f ' Detected architecture { architecture } for Python interpreter: { python } ' , verbosity = 1 )
else :
display . warning ( f ' Conflicting architectures detected ( { architectures } ) for Python interpreter " { python } " from: { candidates } ' )
results [ python ] = architecture
return architecture
def filter_args ( args , filters ) : # type: (t.List[str], t.Dict[str, int]) -> t.List[str]
""" Return a filtered version of the given command line arguments. """
remaining = 0
@ -254,18 +322,44 @@ def get_available_python_versions(): # type: () -> t.Dict[str, str]
def raw_command (
cmd , # type: t.Iterable[str]
capture = False , # type: bool
capture , # type: bool
env = None , # type: t.Optional[t.Dict[str, str]]
data = None , # type: t.Optional[str]
cwd = None , # type: t.Optional[str]
explain = False , # type: bool
stdin = None , # type: t.Optional[t.Union[t.IO[bytes], int]]
stdout = None , # type: t.Optional[t.Union[t.IO[bytes], int]]
interactive = False , # type: bool
force_stdout = False , # type: bool
cmd_verbosity = 1 , # type: int
str_errors = ' strict ' , # type: str
error_callback = None , # type: t.Optional[t.Callable[[SubprocessError], None]]
) : # type: (...) -> t.Tuple[t.Optional[str], t.Optional[str]]
""" Run the specified command and return stdout and stderr as a tuple. """
if capture and interactive :
raise InternalError ( ' Cannot combine capture=True with interactive=True. ' )
if data and interactive :
raise InternalError ( ' Cannot combine data with interactive=True. ' )
if stdin and interactive :
raise InternalError ( ' Cannot combine stdin with interactive=True. ' )
if stdout and interactive :
raise InternalError ( ' Cannot combine stdout with interactive=True. ' )
if stdin and data :
raise InternalError ( ' Cannot combine stdin with data. ' )
if stdout and not capture :
raise InternalError ( ' Redirection of stdout requires capture=True to avoid redirection of stderr to stdout. ' )
if force_stdout and capture :
raise InternalError ( ' Cannot combine force_stdout=True with capture=True. ' )
if force_stdout and interactive :
raise InternalError ( ' Cannot combine force_stdout=True with interactive=True. ' )
if not cwd :
cwd = os . getcwd ( )
@ -276,7 +370,30 @@ def raw_command(
escaped_cmd = ' ' . join ( shlex . quote ( c ) for c in cmd )
display . info ( ' Run command: %s ' % escaped_cmd , verbosity = cmd_verbosity , truncate = True )
if capture :
description = ' Run '
elif interactive :
description = ' Interactive '
else :
description = ' Stream '
description + = ' command '
with_types = [ ]
if data :
with_types . append ( ' data ' )
if stdin :
with_types . append ( ' stdin ' )
if stdout :
with_types . append ( ' stdout ' )
if with_types :
description + = f ' with { " / " . join ( with_types ) } '
display . info ( f ' { description } : { escaped_cmd } ' , verbosity = cmd_verbosity , truncate = True )
display . info ( ' Working directory: %s ' % cwd , verbosity = 2 )
program = find_executable ( cmd [ 0 ] , cwd = cwd , path = env [ ' PATH ' ] , required = ' warning ' )
@ -294,17 +411,23 @@ def raw_command(
if stdin is not None :
data = None
communicate = True
elif data is not None :
stdin = subprocess . PIPE
communicate = True
if stdout :
communicate = True
if capture :
elif interactive :
pass # allow the subprocess access to our stdin
else :
stdin = subprocess . DEVNULL
if not interactive :
# When not running interactively, send subprocess stdout/stderr through a pipe.
# This isolates the stdout/stderr of the subprocess from the current process, and also hides the current TTY from it, if any.
# This prevents subprocesses from sharing stdout/stderr with the current process or each other.
# Doing so allows subprocesses to safely make changes to their file handles, such as making them non-blocking (ssh does this).
# This also maintains consistency between local testing and CI systems, which typically do not provide a TTY.
# To maintain output ordering, a single pipe is used for both stdout/stderr when not capturing output.
stdout = stdout or subprocess . PIPE
stderr = subprocess . PIPE
stderr = subprocess . PIPE if capture else subprocess . STDOUT
communicate = True
else :
stderr = None
@ -324,7 +447,8 @@ def raw_command(
if communicate :
data_bytes = to_optional_bytes ( data )
stdout_bytes , stderr_bytes = process . communicate ( data_bytes )
stdout_bytes , stderr_bytes = communicate_with_process ( process , data_bytes , stdout == subprocess . PIPE , stderr == subprocess . PIPE , capture = capture ,
force_stdout = force_stdout )
stdout_text = to_optional_text ( stdout_bytes , str_errors ) or u ' '
stderr_text = to_optional_text ( stderr_bytes , str_errors ) or u ' '
else :
@ -347,6 +471,122 @@ def raw_command(
raise SubprocessError ( cmd , status , stdout_text , stderr_text , runtime , error_callback )
def communicate_with_process (
process : subprocess . Popen ,
stdin : t . Optional [ bytes ] ,
stdout : bool ,
stderr : bool ,
capture : bool ,
force_stdout : bool
) - > t . Tuple [ bytes , bytes ] :
""" Communicate with the specified process, handling stdin/stdout/stderr as requested. """
threads : t . List [ WrappedThread ] = [ ]
reader : t . Type [ ReaderThread ]
if capture :
reader = CaptureThread
else :
reader = OutputThread
if stdin is not None :
threads . append ( WriterThread ( process . stdin , stdin ) )
if stdout :
stdout_reader = reader ( process . stdout , force_stdout )
threads . append ( stdout_reader )
else :
stdout_reader = None
if stderr :
stderr_reader = reader ( process . stderr , force_stdout )
threads . append ( stderr_reader )
else :
stderr_reader = None
for thread in threads :
thread . start ( )
for thread in threads :
try :
thread . wait_for_result ( )
except Exception as ex : # pylint: disable=broad-except
display . error ( str ( ex ) )
if isinstance ( stdout_reader , ReaderThread ) :
stdout_bytes = b ' ' . join ( stdout_reader . lines )
else :
stdout_bytes = b ' '
if isinstance ( stderr_reader , ReaderThread ) :
stderr_bytes = b ' ' . join ( stderr_reader . lines )
else :
stderr_bytes = b ' '
process . wait ( )
return stdout_bytes , stderr_bytes
class WriterThread ( WrappedThread ) :
""" Thread to write data to stdin of a subprocess. """
def __init__ ( self , handle : t . IO [ bytes ] , data : bytes ) - > None :
super ( ) . __init__ ( self . _run )
self . handle = handle
self . data = data
def _run ( self ) - > None :
""" Workload to run on a thread. """
try :
self . handle . write ( self . data )
self . handle . flush ( )
finally :
self . handle . close ( )
class ReaderThread ( WrappedThread , metaclass = abc . ABCMeta ) :
""" Thread to read stdout from a subprocess. """
def __init__ ( self , handle : t . IO [ bytes ] , force_stdout : bool ) - > None :
super ( ) . __init__ ( self . _run )
self . handle = handle
self . force_stdout = force_stdout
self . lines = [ ] # type: t.List[bytes]
@abc.abstractmethod
def _run ( self ) - > None :
""" Workload to run on a thread. """
class CaptureThread ( ReaderThread ) :
""" Thread to capture stdout from a subprocess into a buffer. """
def _run ( self ) - > None :
""" Workload to run on a thread. """
src = self . handle
dst = self . lines
try :
for line in src :
dst . append ( line )
finally :
src . close ( )
class OutputThread ( ReaderThread ) :
""" Thread to pass stdout from a subprocess to stdout. """
def _run ( self ) - > None :
""" Workload to run on a thread. """
src = self . handle
dst = sys . stdout . buffer if self . force_stdout else display . fd . buffer
try :
for line in src :
dst . write ( line )
dst . flush ( )
finally :
src . close ( )
def common_environment ( ) :
""" Common environment used for executing all programs. """
env = dict (
@ -516,7 +756,7 @@ class Display:
self . color = sys . stdout . isatty ( )
self . warnings = [ ]
self . warnings_unique = set ( )
self . info_stderr = False
self . fd = sys . stderr # default to stderr until config is initialized to avoid early messages going to stdout
self . rows = 0
self . columns = 0
self . truncate = 0
@ -528,7 +768,7 @@ class Display:
def __warning ( self , message ) : # type: (str) -> None
""" Internal implementation for displaying a warning message. """
self . print_message ( ' WARNING: %s ' % message , color = self . purple , fd = sys . stderr )
self . print_message ( ' WARNING: %s ' % message , color = self . purple )
def review_warnings ( self ) : # type: () -> None
""" Review all warnings which previously occurred. """
@ -556,23 +796,27 @@ class Display:
def notice ( self , message ) : # type: (str) -> None
""" Display a notice level message. """
self . print_message ( ' NOTICE: %s ' % message , color = self . purple , fd = sys . stderr )
self . print_message ( ' NOTICE: %s ' % message , color = self . purple )
def error ( self , message ) : # type: (str) -> None
""" Display an error level message. """
self . print_message ( ' ERROR: %s ' % message , color = self . red , fd = sys . stderr )
self . print_message ( ' ERROR: %s ' % message , color = self . red )
def fatal ( self , message ) : # type: (str) -> None
""" Display a fatal level message. """
self . print_message ( ' FATAL: %s ' % message , color = self . red , stderr = True )
def info ( self , message , verbosity = 0 , truncate = False ) : # type: (str, int, bool) -> None
""" Display an info level message. """
if self . verbosity > = verbosity :
color = self . verbosity_colors . get ( verbosity , self . yellow )
self . print_message ( message , color = color , fd= sys . stderr if self . info_stderr else sys . stdout , truncate= truncate )
self . print_message ( message , color = color , truncate= truncate )
def print_message ( # pylint: disable=locally-disabled, invalid-name
self ,
message , # type: str
color = None , # type: t.Optional[str]
fd= sys . stdout , # type: t.IO[str]
stderr= False , # type: bool
truncate = False , # type: bool
) : # type: (...) -> None
""" Display a message. """
@ -592,10 +836,18 @@ class Display:
message = message . replace ( self . clear , color )
message = ' %s %s %s ' % ( color , message , self . clear )
fd = sys . stderr if stderr else self . fd
print ( message , file = fd )
fd . flush ( )
class InternalError ( Exception ) :
""" An unhandled internal error indicating a bug in the code. """
def __init__ ( self , message : str ) - > None :
super ( ) . __init__ ( f ' An internal error has occurred in ansible-test: { message } ' )
class ApplicationError ( Exception ) :
""" General application error. """
@ -648,12 +900,15 @@ class MissingEnvironmentVariable(ApplicationError):
self . name = name
def retry ( func , ex_type = SubprocessError , sleep = 10 , attempts = 10 ):
def retry ( func , ex_type = SubprocessError , sleep = 10 , attempts = 10 , warn = True ):
""" Retry the specified function on failure. """
for dummy in range ( 1 , attempts ) :
try :
return func ( )
except ex_type :
except ex_type as ex :
if warn :
display . warning ( str ( ex ) )
time . sleep ( sleep )
return func ( )