@ -15,8 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from __future__ import ( absolute_import , division , print_function )
__metaclass__ = type
from __future__ import annotations
try :
import curses
@ -26,6 +25,7 @@ else:
# this will be set to False if curses.setupterm() fails
HAS_CURSES = True
import collections . abc as c
import codecs
import ctypes . util
import fcntl
@ -55,6 +55,10 @@ from ansible.utils.multiprocessing import context as multiprocessing_context
from ansible . utils . singleton import Singleton
from ansible . utils . unsafe_proxy import wrap_var
if t . TYPE_CHECKING :
# avoid circular import at runtime
from ansible . executor . task_queue_manager import FinalQueue
_LIBC = ctypes . cdll . LoadLibrary ( ctypes . util . find_library ( ' c ' ) )
# Set argtypes, to avoid segfault if the wrong type is provided,
# restype is assumed to be c_int
@ -67,7 +71,7 @@ MOVE_TO_BOL = b'\r'
CLEAR_TO_EOL = b ' \x1b [K '
def get_text_width ( text ):
def get_text_width ( text : str ) - > int :
""" Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the
number of columns used to display a text string .
@ -192,7 +196,7 @@ b_COW_PATHS = (
)
def _synchronize_textiowrapper ( tio , l ock) :
def _synchronize_textiowrapper ( tio : t . TextIO , l ock: threading . RL ock) :
# Ensure that a background thread can't hold the internal buffer lock on a file object
# during a fork, which causes forked children to hang. We're using display's existing lock for
# convenience (and entering the lock before a fork).
@ -207,11 +211,11 @@ def _synchronize_textiowrapper(tio, lock):
buffer = tio . buffer
# monkeypatching the underlying file-like object isn't great, but likely safer than subclassing
buffer . write = _wrap_with_lock ( buffer . write , lock )
buffer . flush = _wrap_with_lock ( buffer . flush , lock )
buffer . write = _wrap_with_lock ( buffer . write , lock ) # type: ignore[method-assign]
buffer . flush = _wrap_with_lock ( buffer . flush , lock ) # type: ignore[method-assign]
def setraw ( fd , when = termios . TCSAFLUSH ) :
def setraw ( fd : int , when : int = termios . TCSAFLUSH ) - > None :
""" Put terminal into a raw mode.
Copied from ` ` tty ` ` from CPython 3.11 .0 , and modified to not remove OPOST from OFLAG
@ -232,13 +236,12 @@ def setraw(fd, when=termios.TCSAFLUSH):
termios . tcsetattr ( fd , when , mode )
def clear_line ( stdout ):
def clear_line ( stdout : t . BinaryIO ) - > None :
stdout . write ( b ' \x1b [ %s ' % MOVE_TO_BOL )
stdout . write ( b ' \x1b [ %s ' % CLEAR_TO_EOL )
def setup_prompt ( stdin_fd , stdout_fd , seconds , echo ) :
# type: (int, int, int, bool) -> None
def setup_prompt ( stdin_fd : int , stdout_fd : int , seconds : int , echo : bool ) - > None :
setraw ( stdin_fd )
# Only set stdout to raw mode if it is a TTY. This is needed when redirecting
@ -252,7 +255,7 @@ def setup_prompt(stdin_fd, stdout_fd, seconds, echo):
termios . tcsetattr ( stdin_fd , termios . TCSANOW , new_settings )
def setupterm ( ) :
def setupterm ( ) - > None :
# Nest the try except since curses.error is not available if curses did not import
try :
curses . setupterm ( )
@ -269,9 +272,9 @@ def setupterm():
class Display ( metaclass = Singleton ) :
def __init__ ( self , verbosity = 0 ) :
def __init__ ( self , verbosity : int = 0 ) - > None :
self . _final_q = None
self . _final_q : FinalQueue | None = None
# NB: this lock is used to both prevent intermingled output between threads and to block writes during forks.
# Do not change the type of this lock or upgrade to a shared lock (eg multiprocessing.RLock).
@ -281,11 +284,11 @@ class Display(metaclass=Singleton):
self . verbosity = verbosity
# list of all deprecation messages to prevent duplicate display
self . _deprecations = { }
self . _warns = { }
self . _errors = { }
self . _deprecations : dict [ str , int ] = { }
self . _warns : dict [ str , int ] = { }
self . _errors : dict [ str , int ] = { }
self . b_cowsay = None
self . b_cowsay : bytes | None = None
self . noncow = C . ANSIBLE_COW_SELECTION
self . set_cowsay_info ( )
@ -296,12 +299,12 @@ class Display(metaclass=Singleton):
( out , err ) = cmd . communicate ( )
if cmd . returncode :
raise Exception
self . cows_available = { to_text ( c ) for c in out . split ( ) } # set comprehension
self . cows_available : set [ str ] = { to_text ( c ) for c in out . split ( ) }
if C . ANSIBLE_COW_ACCEPTLIST and any ( C . ANSIBLE_COW_ACCEPTLIST ) :
self . cows_available = set ( C . ANSIBLE_COW_ACCEPTLIST ) . intersection ( self . cows_available )
except Exception :
# could not execute cowsay for some reason
self . b_cowsay = Fals e
self . b_cowsay = Non e
self . _set_column_width ( )
@ -321,7 +324,7 @@ class Display(metaclass=Singleton):
self . setup_curses = False
def _replacing_warning_handler ( self , exception ):
def _replacing_warning_handler ( self , exception : UnicodeError ) - > tuple [ str | bytes , int ] :
# TODO: This should probably be deferred until after the current display is completed
# this will require some amount of new functionality
self . deprecated (
@ -330,7 +333,7 @@ class Display(metaclass=Singleton):
)
return ' ? ' , exception . end
def set_queue ( self , queue ):
def set_queue ( self , queue : FinalQueue ) - > None :
""" Set the _final_q on Display, so that we know to proxy display over the queue
instead of directly writing to stdout / stderr from forks
@ -340,7 +343,7 @@ class Display(metaclass=Singleton):
raise RuntimeError ( ' queue cannot be set in parent process ' )
self . _final_q = queue
def set_cowsay_info ( self ) :
def set_cowsay_info ( self ) - > None :
if C . ANSIBLE_NOCOWS :
return
@ -352,7 +355,15 @@ class Display(metaclass=Singleton):
self . b_cowsay = b_cow_path
@proxy_display
def display ( self , msg , color = None , stderr = False , screen_only = False , log_only = False , newline = True ) :
def display (
self ,
msg : str ,
color : str | None = None ,
stderr : bool = False ,
screen_only : bool = False ,
log_only : bool = False ,
newline : bool = True ,
) - > None :
""" Display a message to the user
Note : msg * must * be a unicode string to prevent UnicodeError tracebacks .
@ -414,32 +425,32 @@ class Display(metaclass=Singleton):
# actually log
logger . log ( lvl , msg2 )
def v ( self , msg , host = None ) :
def v ( self , msg : str , host : str | None = None ) - > None :
return self . verbose ( msg , host = host , caplevel = 0 )
def vv ( self , msg , host = None ) :
def vv ( self , msg : str , host : str | None = None ) - > None :
return self . verbose ( msg , host = host , caplevel = 1 )
def vvv ( self , msg , host = None ) :
def vvv ( self , msg : str , host : str | None = None ) - > None :
return self . verbose ( msg , host = host , caplevel = 2 )
def vvvv ( self , msg , host = None ) :
def vvvv ( self , msg : str , host : str | None = None ) - > None :
return self . verbose ( msg , host = host , caplevel = 3 )
def vvvvv ( self , msg , host = None ) :
def vvvvv ( self , msg : str , host : str | None = None ) - > None :
return self . verbose ( msg , host = host , caplevel = 4 )
def vvvvvv ( self , msg , host = None ) :
def vvvvvv ( self , msg : str , host : str | None = None ) - > None :
return self . verbose ( msg , host = host , caplevel = 5 )
def debug ( self , msg , host = None ) :
def debug ( self , msg : str , host : str | None = None ) - > None :
if C . DEFAULT_DEBUG :
if host is None :
self . display ( " %6d %0.5f : %s " % ( os . getpid ( ) , time . time ( ) , msg ) , color = C . COLOR_DEBUG )
else :
self . display ( " %6d %0.5f [ %s ]: %s " % ( os . getpid ( ) , time . time ( ) , host , msg ) , color = C . COLOR_DEBUG )
def verbose ( self , msg , host = None , caplevel = 2 ) :
def verbose ( self , msg : str , host : str | None = None , caplevel : int = 2 ) - > None :
to_stderr = C . VERBOSE_TO_STDERR
if self . verbosity > caplevel :
@ -448,7 +459,14 @@ class Display(metaclass=Singleton):
else :
self . display ( " < %s > %s " % ( host , msg ) , color = C . COLOR_VERBOSE , stderr = to_stderr )
def get_deprecation_message ( self , msg , version = None , removed = False , date = None , collection_name = None ) :
def get_deprecation_message (
self ,
msg : str ,
version : str | None = None ,
removed : bool = False ,
date : str | None = None ,
collection_name : str | None = None ,
) - > str :
''' used to print out a deprecation message. '''
msg = msg . strip ( )
if msg and msg [ - 1 ] not in [ ' ! ' , ' ? ' , ' . ' ] :
@ -484,7 +502,14 @@ class Display(metaclass=Singleton):
return message_text
@proxy_display
def deprecated ( self , msg , version = None , removed = False , date = None , collection_name = None ) :
def deprecated (
self ,
msg : str ,
version : str | None = None ,
removed : bool = False ,
date : str | None = None ,
collection_name : str | None = None ,
) - > None :
if not removed and not C . DEPRECATION_WARNINGS :
return
@ -501,7 +526,7 @@ class Display(metaclass=Singleton):
self . _deprecations [ message_text ] = 1
@proxy_display
def warning ( self , msg , formatted = False ) :
def warning ( self , msg : str , formatted : bool = False ) - > None :
if not formatted :
new_msg = " [WARNING]: %s " % msg
@ -514,11 +539,11 @@ class Display(metaclass=Singleton):
self . display ( new_msg , color = C . COLOR_WARN , stderr = True )
self . _warns [ new_msg ] = 1
def system_warning ( self , msg ):
def system_warning ( self , msg : str ) - > None :
if C . SYSTEM_WARNINGS :
self . warning ( msg )
def banner ( self , msg , color = None , cows = True ) :
def banner ( self , msg : str , color : str | None = None , cows : bool = True ) - > None :
'''
Prints a header - looking line with cowsay or stars with length depending on terminal width ( 3 minimum )
'''
@ -541,7 +566,7 @@ class Display(metaclass=Singleton):
stars = u " * " * star_len
self . display ( u " \n %s %s " % ( msg , stars ) , color = color )
def banner_cowsay ( self , msg , color = None ) :
def banner_cowsay ( self , msg : str , color : str | None = None ) - > None :
if u " : [ " in msg :
msg = msg . replace ( u " [ " , u " " )
if msg . endswith ( u " ] " ) :
@ -558,7 +583,7 @@ class Display(metaclass=Singleton):
( out , err ) = cmd . communicate ( )
self . display ( u " %s \n " % to_text ( out ) , color = color )
def error ( self , msg , wrap_text = True ) :
def error ( self , msg : str , wrap_text : bool = True ) - > None :
if wrap_text :
new_msg = u " \n [ERROR]: %s " % msg
wrapped = textwrap . wrap ( new_msg , self . columns )
@ -570,14 +595,24 @@ class Display(metaclass=Singleton):
self . _errors [ new_msg ] = 1
@staticmethod
def prompt ( msg , private = False ) :
def prompt ( msg : str , private : bool = False ) - > str :
if private :
return getpass . getpass ( msg )
else :
return input ( msg )
def do_var_prompt ( self , varname , private = True , prompt = None , encrypt = None , confirm = False , salt_size = None , salt = None , default = None , unsafe = None ) :
def do_var_prompt (
self ,
varname : str ,
private : bool = True ,
prompt : str | None = None ,
encrypt : str | None = None ,
confirm : bool = False ,
salt_size : int | None = None ,
salt : str | None = None ,
default : str | None = None ,
unsafe : bool = False ,
) - > str :
result = None
if sys . __stdin__ . isatty ( ) :
@ -619,14 +654,21 @@ class Display(metaclass=Singleton):
result = wrap_var ( result )
return result
def _set_column_width ( self ) :
def _set_column_width ( self ) - > None :
if os . isatty ( 1 ) :
tty_size = unpack ( ' HHHH ' , fcntl . ioctl ( 1 , termios . TIOCGWINSZ , pack ( ' HHHH ' , 0 , 0 , 0 , 0 ) ) ) [ 1 ]
else :
tty_size = 0
self . columns = max ( 79 , tty_size - 1 )
def prompt_until ( self , msg , private = False , seconds = None , interrupt_input = None , complete_input = None ) :
def prompt_until (
self ,
msg : str ,
private : bool = False ,
seconds : int | None = None ,
interrupt_input : c . Container [ bytes ] | None = None ,
complete_input : c . Container [ bytes ] | None = None ,
) - > bytes :
if self . _final_q :
from ansible . executor . process . worker import current_worker
self . _final_q . send_prompt (
@ -678,12 +720,11 @@ class Display(metaclass=Singleton):
def _read_non_blocking_stdin (
self ,
echo = False , # type: bool
seconds = None , # type: int
interrupt_input = None , # type: t.Iterable[bytes]
complete_input = None , # type: t.Iterable[bytes]
) : # type: (...) -> bytes
echo : bool = False ,
seconds : int | None = None ,
interrupt_input : c . Container [ bytes ] | None = None ,
complete_input : c . Container [ bytes ] | None = None ,
) - > bytes :
if self . _final_q :
raise NotImplementedError
@ -732,7 +773,7 @@ class Display(metaclass=Singleton):
return result_string
@property
def _stdin ( self ) :
def _stdin ( self ) - > t . BinaryIO | None :
if self . _final_q :
raise NotImplementedError
try :
@ -741,20 +782,20 @@ class Display(metaclass=Singleton):
return None
@property
def _stdin_fd ( self ) :
def _stdin_fd ( self ) - > int | None :
try :
return self . _stdin . fileno ( )
except ( ValueError , AttributeError ) :
return None
@property
def _stdout ( self ) :
def _stdout ( self ) - > t . BinaryIO :
if self . _final_q :
raise NotImplementedError
return sys . stdout . buffer
@property
def _stdout_fd ( self ) :
def _stdout_fd ( self ) - > int | None :
try :
return self . _stdout . fileno ( )
except ( ValueError , AttributeError ) :