mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1245 lines
46 KiB
Python
1245 lines
46 KiB
Python
# (c) 2014, Michael DeHaan <michael.dehaan@gmail.com>
|
|
#
|
|
# This file is part of Ansible
|
|
#
|
|
# Ansible is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# Ansible is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import datetime
|
|
|
|
try:
|
|
import curses
|
|
except ImportError:
|
|
HAS_CURSES = False
|
|
else:
|
|
# this will be set to False if curses.setupterm() fails
|
|
HAS_CURSES = True
|
|
|
|
import collections.abc as c
|
|
import codecs
|
|
import ctypes.util
|
|
import fcntl
|
|
import getpass
|
|
import io
|
|
import logging
|
|
import os
|
|
import secrets
|
|
import subprocess
|
|
import sys
|
|
import termios
|
|
import textwrap
|
|
import threading
|
|
import time
|
|
import tty
|
|
import typing as t
|
|
|
|
from functools import wraps
|
|
from struct import unpack, pack
|
|
|
|
from ansible import constants as C
|
|
from ansible.errors import AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive, AnsibleError
|
|
from ansible._internal._errors import _utils
|
|
from ansible.module_utils._internal import _ambient_context, _plugin_exec_context
|
|
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
|
from ansible._internal._datatag._tags import TrustedAsTemplate
|
|
from ansible.module_utils.common.messages import ErrorSummary, WarningSummary, DeprecationSummary, Detail, SummaryBase, PluginInfo
|
|
from ansible.module_utils.six import text_type
|
|
from ansible.module_utils._internal import _traceback
|
|
from ansible.utils.color import stringc
|
|
from ansible.utils.multiprocessing import context as multiprocessing_context
|
|
from ansible.utils.singleton import Singleton
|
|
|
|
if t.TYPE_CHECKING:
|
|
# avoid circular import at runtime
|
|
from ansible.executor.task_queue_manager import FinalQueue
|
|
|
|
P = t.ParamSpec('P')
|
|
|
|
_LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
|
|
# Set argtypes, to avoid segfault if the wrong type is provided,
|
|
# restype is assumed to be c_int
|
|
_LIBC.wcwidth.argtypes = (ctypes.c_wchar,)
|
|
_LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int)
|
|
# Max for c_int
|
|
_MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1
|
|
|
|
_UNSET = t.cast(t.Any, object())
|
|
|
|
MOVE_TO_BOL = b'\r'
|
|
CLEAR_TO_EOL = b'\x1b[K'
|
|
|
|
|
|
def _is_controller_traceback_enabled(event: _traceback.TracebackEvent) -> bool:
|
|
"""Controller utility function to determine if traceback collection is enabled for the specified event."""
|
|
flag_values: set[str] = set(value for value in C.config.get_config_value('DISPLAY_TRACEBACK'))
|
|
|
|
if 'always' in flag_values:
|
|
return True
|
|
|
|
if 'never' in flag_values:
|
|
return False
|
|
|
|
return event.name.lower() in flag_values
|
|
|
|
|
|
_traceback._is_traceback_enabled = _is_controller_traceback_enabled
|
|
|
|
|
|
def get_text_width(text: str) -> int:
|
|
"""Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the
|
|
number of columns used to display a text string.
|
|
|
|
We try first with ``wcswidth``, and fallback to iterating each
|
|
character and using wcwidth individually, falling back to a value of 0
|
|
for non-printable wide characters.
|
|
"""
|
|
if not isinstance(text, text_type):
|
|
raise TypeError('get_text_width requires text, not %s' % type(text))
|
|
|
|
try:
|
|
width = _LIBC.wcswidth(text, _MAX_INT)
|
|
except ctypes.ArgumentError:
|
|
width = -1
|
|
if width != -1:
|
|
return width
|
|
|
|
width = 0
|
|
counter = 0
|
|
for c in text:
|
|
counter += 1
|
|
if c in (u'\x08', u'\x7f', u'\x94', u'\x1b'):
|
|
# A few characters result in a subtraction of length:
|
|
# BS, DEL, CCH, ESC
|
|
# ESC is slightly different in that it's part of an escape sequence, and
|
|
# while ESC is non printable, it's part of an escape sequence, which results
|
|
# in a single non printable length
|
|
width -= 1
|
|
counter -= 1
|
|
continue
|
|
|
|
try:
|
|
w = _LIBC.wcwidth(c)
|
|
except ctypes.ArgumentError:
|
|
w = -1
|
|
if w == -1:
|
|
# -1 signifies a non-printable character
|
|
# use 0 here as a best effort
|
|
w = 0
|
|
width += w
|
|
|
|
if width == 0 and counter:
|
|
raise EnvironmentError(
|
|
'get_text_width could not calculate text width of %r' % text
|
|
)
|
|
|
|
# It doesn't make sense to have a negative printable width
|
|
return width if width >= 0 else 0
|
|
|
|
|
|
class FilterBlackList(logging.Filter):
|
|
def __init__(self, blacklist):
|
|
self.blacklist = [logging.Filter(name) for name in blacklist]
|
|
|
|
def filter(self, record):
|
|
return not any(f.filter(record) for f in self.blacklist)
|
|
|
|
|
|
class FilterUserInjector(logging.Filter):
|
|
"""
|
|
This is a filter which injects the current user as the 'user' attribute on each record. We need to add this filter
|
|
to all logger handlers so that 3rd party libraries won't print an exception due to user not being defined.
|
|
"""
|
|
|
|
try:
|
|
username = getpass.getuser()
|
|
except KeyError:
|
|
# people like to make containers w/o actual valid passwd/shadow and use host uids
|
|
username = 'uid=%s' % os.getuid()
|
|
|
|
def filter(self, record):
|
|
record.user = FilterUserInjector.username
|
|
return True
|
|
|
|
|
|
logger = None
|
|
# TODO: make this a callback event instead
|
|
if getattr(C, 'DEFAULT_LOG_PATH'):
|
|
path = C.DEFAULT_LOG_PATH
|
|
if path and (os.path.exists(path) and os.access(path, os.W_OK)) or os.access(os.path.dirname(path), os.W_OK):
|
|
if not os.path.isdir(path):
|
|
# NOTE: level is kept at INFO to avoid security disclosures caused by certain libraries when using DEBUG
|
|
logging.basicConfig(filename=path, level=logging.INFO, # DO NOT set to logging.DEBUG
|
|
format='%(asctime)s p=%(process)d u=%(user)s n=%(name)s %(levelname)s| %(message)s')
|
|
|
|
logger = logging.getLogger('ansible')
|
|
for handler in logging.root.handlers:
|
|
handler.addFilter(FilterBlackList(getattr(C, 'DEFAULT_LOG_FILTER', [])))
|
|
handler.addFilter(FilterUserInjector())
|
|
else:
|
|
print(f"[WARNING]: DEFAULT_LOG_PATH can not be a directory '{path}', aborting", file=sys.stderr)
|
|
else:
|
|
print(f"[WARNING]: log file at '{path}' is not writeable and we cannot create it, aborting\n", file=sys.stderr)
|
|
|
|
# map color to log levels, in order of priority (low to high)
|
|
color_to_log_level = {C.COLOR_DEBUG: logging.DEBUG,
|
|
C.COLOR_VERBOSE: logging.INFO,
|
|
C.COLOR_OK: logging.INFO,
|
|
C.COLOR_INCLUDED: logging.INFO,
|
|
C.COLOR_CHANGED: logging.INFO,
|
|
C.COLOR_SKIP: logging.WARNING,
|
|
C.COLOR_DEPRECATE: logging.WARNING,
|
|
C.COLOR_WARN: logging.WARNING,
|
|
C.COLOR_UNREACHABLE: logging.ERROR,
|
|
C.COLOR_ERROR: logging.ERROR}
|
|
|
|
b_COW_PATHS = (
|
|
b"/usr/bin/cowsay",
|
|
b"/usr/games/cowsay",
|
|
b"/usr/local/bin/cowsay", # BSD path for cowsay
|
|
b"/opt/local/bin/cowsay", # MacPorts path for cowsay
|
|
)
|
|
|
|
|
|
def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock):
|
|
# Ensure that a background thread can't hold the internal buffer lock on a file object
|
|
# during a fork, which causes forked children to hang. We're using display's existing lock for
|
|
# convenience (and entering the lock before a fork).
|
|
def _wrap_with_lock(f, lock):
|
|
@wraps(f)
|
|
def locking_wrapper(*args, **kwargs):
|
|
with lock:
|
|
return f(*args, **kwargs)
|
|
|
|
return locking_wrapper
|
|
|
|
buffer = tio.buffer
|
|
|
|
# monkeypatching the underlying file-like object isn't great, but likely safer than subclassing
|
|
buffer.write = _wrap_with_lock(buffer.write, lock) # type: ignore[method-assign]
|
|
buffer.flush = _wrap_with_lock(buffer.flush, lock) # type: ignore[method-assign]
|
|
|
|
|
|
def setraw(fd: int, when: int = termios.TCSAFLUSH) -> None:
|
|
"""Put terminal into a raw mode.
|
|
|
|
Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG
|
|
|
|
OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display
|
|
is proxied via the queue from forks. The problem is a race condition, in that we proxy the display
|
|
over the fork, but before it can be displayed, this plugin will have continued executing, potentially
|
|
setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF
|
|
"""
|
|
mode = termios.tcgetattr(fd)
|
|
mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON)
|
|
mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST)
|
|
mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB)
|
|
mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8
|
|
mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
|
|
mode[tty.CC][termios.VMIN] = 1
|
|
mode[tty.CC][termios.VTIME] = 0
|
|
termios.tcsetattr(fd, when, mode)
|
|
|
|
|
|
def clear_line(stdout: t.BinaryIO) -> None:
|
|
stdout.write(b'\x1b[%s' % MOVE_TO_BOL)
|
|
stdout.write(b'\x1b[%s' % CLEAR_TO_EOL)
|
|
|
|
|
|
def setup_prompt(stdin_fd: int, stdout_fd: int, seconds: int, echo: bool) -> None:
|
|
setraw(stdin_fd)
|
|
|
|
# Only set stdout to raw mode if it is a TTY. This is needed when redirecting
|
|
# stdout to a file since a file cannot be set to raw mode.
|
|
if os.isatty(stdout_fd):
|
|
setraw(stdout_fd)
|
|
|
|
if echo:
|
|
new_settings = termios.tcgetattr(stdin_fd)
|
|
new_settings[3] = new_settings[3] | termios.ECHO
|
|
termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings)
|
|
|
|
|
|
def setupterm() -> None:
|
|
# Nest the try except since curses.error is not available if curses did not import
|
|
try:
|
|
curses.setupterm()
|
|
except (curses.error, TypeError, io.UnsupportedOperation):
|
|
global HAS_CURSES
|
|
HAS_CURSES = False
|
|
else:
|
|
global MOVE_TO_BOL
|
|
global CLEAR_TO_EOL
|
|
# curses.tigetstr() returns None in some circumstances
|
|
MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL
|
|
CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL
|
|
|
|
|
|
class Display(metaclass=Singleton):
|
|
|
|
def __init__(self, verbosity: int = 0) -> None:
|
|
|
|
self._final_q: FinalQueue | None = None
|
|
|
|
# NB: this lock is used to both prevent intermingled output between threads and to block writes during forks.
|
|
# Do not change the type of this lock or upgrade to a shared lock (eg multiprocessing.RLock).
|
|
self._lock = threading.RLock()
|
|
|
|
self.columns = None
|
|
self.verbosity = verbosity
|
|
|
|
if C.LOG_VERBOSITY is None:
|
|
self.log_verbosity = verbosity
|
|
else:
|
|
self.log_verbosity = max(verbosity, C.LOG_VERBOSITY)
|
|
|
|
# list of all deprecation messages to prevent duplicate display
|
|
self._deprecations: set[str] = set()
|
|
self._warns: set[str] = set()
|
|
self._errors: set[str] = set()
|
|
|
|
self.b_cowsay: bytes | None = None
|
|
self.noncow = C.ANSIBLE_COW_SELECTION
|
|
|
|
self.set_cowsay_info()
|
|
self._wrap_stderr = C.WRAP_STDERR
|
|
|
|
if self.b_cowsay:
|
|
try:
|
|
cmd = subprocess.Popen([self.b_cowsay, "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
(out, err) = cmd.communicate()
|
|
if cmd.returncode:
|
|
raise Exception
|
|
self.cows_available: set[str] = {to_text(c) for c in out.split()}
|
|
if C.ANSIBLE_COW_ACCEPTLIST and any(C.ANSIBLE_COW_ACCEPTLIST):
|
|
self.cows_available = set(C.ANSIBLE_COW_ACCEPTLIST).intersection(self.cows_available)
|
|
except Exception:
|
|
# could not execute cowsay for some reason
|
|
self.b_cowsay = None
|
|
|
|
self._set_column_width()
|
|
|
|
try:
|
|
# NB: we're relying on the display singleton behavior to ensure this only runs once
|
|
_synchronize_textiowrapper(sys.stdout, self._lock)
|
|
_synchronize_textiowrapper(sys.stderr, self._lock)
|
|
except Exception as ex:
|
|
self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}")
|
|
|
|
codecs.register_error('_replacing_warning_handler', self._replacing_warning_handler)
|
|
try:
|
|
sys.stdout.reconfigure(errors='_replacing_warning_handler') # type: ignore[union-attr]
|
|
sys.stderr.reconfigure(errors='_replacing_warning_handler') # type: ignore[union-attr]
|
|
except Exception as ex:
|
|
self.warning(f"failed to reconfigure stdout/stderr with custom encoding error handler: {ex}")
|
|
|
|
self.setup_curses = False
|
|
|
|
def _replacing_warning_handler(self, exception: UnicodeError) -> tuple[str | bytes, int]:
|
|
# This can't be removed as long as we have the possibility of encountering un-renderable strings
|
|
# created with `surrogateescape`; the alternative of having display methods hard fail is untenable.
|
|
self.warning('Non UTF-8 encoded data replaced with "?" while displaying text to stdout/stderr.')
|
|
return '?', exception.end
|
|
|
|
def set_queue(self, queue: FinalQueue) -> None:
|
|
"""Set the _final_q on Display, so that we know to proxy display over the queue
|
|
instead of directly writing to stdout/stderr from forks
|
|
|
|
This is only needed in ansible.executor.process.worker:WorkerProcess._run
|
|
"""
|
|
if multiprocessing_context.parent_process() is None:
|
|
raise RuntimeError('queue cannot be set in parent process')
|
|
self._final_q = queue
|
|
|
|
def set_cowsay_info(self) -> None:
|
|
if C.ANSIBLE_NOCOWS:
|
|
return
|
|
|
|
if C.ANSIBLE_COW_PATH:
|
|
self.b_cowsay = C.ANSIBLE_COW_PATH
|
|
else:
|
|
for b_cow_path in b_COW_PATHS:
|
|
if os.path.exists(b_cow_path):
|
|
self.b_cowsay = b_cow_path
|
|
|
|
@staticmethod
|
|
def _proxy(
|
|
func: c.Callable[t.Concatenate[Display, P], None]
|
|
) -> c.Callable[..., None]:
|
|
@wraps(func)
|
|
def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> None:
|
|
if self._final_q:
|
|
# If _final_q is set, that means we are in a WorkerProcess
|
|
# and instead of displaying messages directly from the fork
|
|
# we will proxy them through the queue
|
|
return self._final_q.send_display(func.__name__, *args, **kwargs)
|
|
return func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
@staticmethod
|
|
def _meets_debug(
|
|
func: c.Callable[..., None]
|
|
) -> c.Callable[..., None]:
|
|
"""This method ensures that debug is enabled before delegating to the proxy
|
|
"""
|
|
@wraps(func)
|
|
def wrapper(self, msg: str, host: str | None = None) -> None:
|
|
if not C.DEFAULT_DEBUG:
|
|
return
|
|
return func(self, msg, host=host)
|
|
return wrapper
|
|
|
|
@staticmethod
|
|
def _meets_verbosity(
|
|
func: c.Callable[..., None]
|
|
) -> c.Callable[..., None]:
|
|
"""This method ensures the verbosity has been met before delegating to the proxy
|
|
|
|
Currently this method is unused, and the logic is handled directly in ``verbose``
|
|
"""
|
|
@wraps(func)
|
|
def wrapper(self, msg: str, host: str | None = None, caplevel: int = None) -> None:
|
|
if self.verbosity > caplevel:
|
|
return func(self, msg, host=host, caplevel=caplevel)
|
|
return
|
|
return wrapper
|
|
|
|
@_proxy
|
|
def display(
|
|
self,
|
|
msg: str,
|
|
color: str | None = None,
|
|
stderr: bool = False,
|
|
screen_only: bool = False,
|
|
log_only: bool = False,
|
|
newline: bool = True,
|
|
caplevel: int | None = None,
|
|
) -> None:
|
|
""" Display a message to the user
|
|
|
|
Note: msg *must* be a unicode string to prevent UnicodeError tracebacks.
|
|
"""
|
|
|
|
if not isinstance(msg, str):
|
|
raise TypeError(f'Display message must be str, not: {msg.__class__.__name__}')
|
|
|
|
nocolor = msg
|
|
|
|
if not log_only:
|
|
|
|
has_newline = msg.endswith(u'\n')
|
|
if has_newline:
|
|
msg2 = msg[:-1]
|
|
else:
|
|
msg2 = msg
|
|
|
|
if color:
|
|
msg2 = stringc(msg2, color)
|
|
|
|
if has_newline or newline:
|
|
msg2 = msg2 + u'\n'
|
|
|
|
# Note: After Display() class is refactored need to update the log capture
|
|
# code in 'cli/scripts/ansible_connection_cli_stub.py' (and other relevant places).
|
|
if not stderr:
|
|
fileobj = sys.stdout
|
|
else:
|
|
fileobj = sys.stderr
|
|
|
|
with self._lock:
|
|
fileobj.write(msg2)
|
|
|
|
# With locks, and the fact that we aren't printing from forks
|
|
# just write, and let the system flush. Everything should come out peachy
|
|
# I've left this code for historical purposes, or in case we need to add this
|
|
# back at a later date. For now ``TaskQueueManager.cleanup`` will perform a
|
|
# final flush at shutdown.
|
|
# try:
|
|
# fileobj.flush()
|
|
# except IOError as e:
|
|
# # Ignore EPIPE in case fileobj has been prematurely closed, eg.
|
|
# # when piping to "head -n1"
|
|
# if e.errno != errno.EPIPE:
|
|
# raise
|
|
|
|
if logger and not screen_only:
|
|
self._log(nocolor, color, caplevel)
|
|
|
|
def _log(self, msg: str, color: str | None = None, caplevel: int | None = None):
|
|
|
|
if logger and (caplevel is None or self.log_verbosity > caplevel):
|
|
msg2 = msg.lstrip('\n')
|
|
|
|
if caplevel is None or caplevel > 0:
|
|
lvl = logging.INFO
|
|
elif caplevel == -1:
|
|
lvl = logging.ERROR
|
|
elif caplevel == -2:
|
|
lvl = logging.WARNING
|
|
elif caplevel == -3:
|
|
lvl = logging.DEBUG
|
|
elif color:
|
|
# set logger level based on color (not great)
|
|
# but last resort and backwards compatible
|
|
try:
|
|
lvl = color_to_log_level[color]
|
|
except KeyError:
|
|
# this should not happen if mapping is updated with new color configs, but JIC
|
|
raise AnsibleAssertionError('Invalid color supplied to display: %s' % color)
|
|
|
|
# actually log
|
|
logger.log(lvl, msg2)
|
|
|
|
def v(self, msg: str, host: str | None = None) -> None:
|
|
return self.verbose(msg, host=host, caplevel=0)
|
|
|
|
def vv(self, msg: str, host: str | None = None) -> None:
|
|
return self.verbose(msg, host=host, caplevel=1)
|
|
|
|
def vvv(self, msg: str, host: str | None = None) -> None:
|
|
return self.verbose(msg, host=host, caplevel=2)
|
|
|
|
def vvvv(self, msg: str, host: str | None = None) -> None:
|
|
return self.verbose(msg, host=host, caplevel=3)
|
|
|
|
def vvvvv(self, msg: str, host: str | None = None) -> None:
|
|
return self.verbose(msg, host=host, caplevel=4)
|
|
|
|
def vvvvvv(self, msg: str, host: str | None = None) -> None:
|
|
return self.verbose(msg, host=host, caplevel=5)
|
|
|
|
def verbose(self, msg: str, host: str | None = None, caplevel: int = 2) -> None:
|
|
if self.verbosity > caplevel:
|
|
self._verbose_display(msg, host=host, caplevel=caplevel)
|
|
|
|
if self.log_verbosity > self.verbosity and self.log_verbosity > caplevel:
|
|
self._verbose_log(msg, host=host, caplevel=caplevel)
|
|
|
|
@_proxy
|
|
def _verbose_display(self, msg: str, host: str | None = None, caplevel: int = 2) -> None:
|
|
to_stderr = C.VERBOSE_TO_STDERR
|
|
if host is None:
|
|
self.display(msg, color=C.COLOR_VERBOSE, stderr=to_stderr)
|
|
else:
|
|
self.display("<%s> %s" % (host, msg), color=C.COLOR_VERBOSE, stderr=to_stderr)
|
|
|
|
@_proxy
|
|
def _verbose_log(self, msg: str, host: str | None = None, caplevel: int = 2) -> None:
|
|
# we send to log if log was configured with higher verbosity
|
|
if host is not None:
|
|
msg = "<%s> %s" % (host, msg)
|
|
self._log(msg, C.COLOR_VERBOSE, caplevel)
|
|
|
|
@_meets_debug
|
|
@_proxy
|
|
def debug(self, msg: str, host: str | None = None) -> None:
|
|
prefix = "%6d %0.5f" % (os.getpid(), time.time())
|
|
if host is not None:
|
|
prefix += f" [{host}]"
|
|
self.display(f"{prefix}: {msg}", color=C.COLOR_DEBUG, caplevel=-3)
|
|
|
|
def get_deprecation_message(
|
|
self,
|
|
msg: str,
|
|
version: str | None = None,
|
|
removed: bool = False,
|
|
date: str | datetime.date | None = None,
|
|
collection_name: str | None = None,
|
|
) -> str:
|
|
"""Return a deprecation message and help text for non-display purposes (e.g., exception messages)."""
|
|
self.deprecated(
|
|
msg="The `get_deprecation_message` method is deprecated.",
|
|
help_text="Use the `deprecated` method instead.",
|
|
version="2.23",
|
|
)
|
|
|
|
msg = self._get_deprecation_message_with_plugin_info(
|
|
msg=msg,
|
|
version=version,
|
|
removed=removed,
|
|
date=date,
|
|
plugin=_plugin_exec_context.PluginExecContext.get_current_plugin_info(),
|
|
)
|
|
|
|
if removed:
|
|
msg = f'[DEPRECATED]: {msg}'
|
|
else:
|
|
msg = f'[DEPRECATION WARNING]: {msg}'
|
|
|
|
return msg
|
|
|
|
def _get_deprecation_message_with_plugin_info(
|
|
self,
|
|
msg: str,
|
|
version: str | None = None,
|
|
removed: bool = False,
|
|
date: str | datetime.date | None = None,
|
|
plugin: PluginInfo | None = None,
|
|
) -> str:
|
|
"""Internal use only. Return a deprecation message and help text for display."""
|
|
msg = msg.strip()
|
|
|
|
if msg and msg[-1] not in ['!', '?', '.']:
|
|
msg += '.'
|
|
|
|
if removed:
|
|
removal_fragment = 'This feature was removed'
|
|
help_text = 'Please update your playbooks.'
|
|
else:
|
|
removal_fragment = 'This feature will be removed'
|
|
help_text = ''
|
|
|
|
if plugin:
|
|
from_fragment = f'from the {self._describe_plugin_info(plugin)}'
|
|
else:
|
|
from_fragment = ''
|
|
|
|
if date:
|
|
when = 'in a release after {0}.'.format(date)
|
|
elif version:
|
|
when = 'in version {0}.'.format(version)
|
|
else:
|
|
when = 'in a future release.'
|
|
|
|
message_text = ' '.join(f for f in [msg, removal_fragment, from_fragment, when, help_text] if f)
|
|
|
|
return message_text
|
|
|
|
@staticmethod
|
|
def _describe_plugin_info(plugin_info: PluginInfo) -> str:
|
|
"""Return a brief description of the plugin info, including name(s) and type."""
|
|
name = repr(plugin_info.resolved_name)
|
|
clarification = f' (requested as {plugin_info.requested_name!r})' if plugin_info.requested_name != plugin_info.resolved_name else ''
|
|
|
|
if plugin_info.type in ("module", "modules"):
|
|
# DTFIX-RELEASE: pluginloader or AnsiblePlugin needs a "type desc" property that doesn't suffer from legacy "inconsistencies" like this
|
|
plugin_type = "module"
|
|
elif plugin_info.type == "collection":
|
|
# not a real plugin type, but used for tombstone errors generated by plugin loader
|
|
plugin_type = plugin_info.type
|
|
else:
|
|
plugin_type = f'{plugin_info.type} plugin'
|
|
|
|
return f'{name} {plugin_type}{clarification}'
|
|
|
|
def _wrap_message(self, msg: str, wrap_text: bool) -> str:
|
|
if wrap_text and self._wrap_stderr:
|
|
wrapped = textwrap.wrap(msg, self.columns, drop_whitespace=False)
|
|
msg = "\n".join(wrapped) + "\n"
|
|
|
|
return msg
|
|
|
|
@staticmethod
|
|
def _deduplicate(msg: str, messages: set[str]) -> bool:
|
|
"""
|
|
Return True if the given message was previously seen, otherwise record the message as seen and return False.
|
|
This is done very late (at display-time) to avoid loss of attribution of messages to individual tasks.
|
|
Duplicates included in task results will always be visible to registered variables and callbacks.
|
|
"""
|
|
|
|
if msg in messages:
|
|
return True
|
|
|
|
messages.add(msg)
|
|
|
|
return False
|
|
|
|
def deprecated(
|
|
self,
|
|
msg: str,
|
|
version: str | None = None,
|
|
removed: bool = False,
|
|
date: str | datetime.date | None = None,
|
|
collection_name: str | None = _UNSET,
|
|
*,
|
|
help_text: str | None = None,
|
|
obj: t.Any = None,
|
|
) -> None:
|
|
"""Display a deprecation warning message, if enabled."""
|
|
# deprecated: description='enable the deprecation message for collection_name' core_version='2.23'
|
|
# if collection_name is not _UNSET:
|
|
# self.deprecated('The `collection_name` argument to `deprecated` is deprecated.', version='2.27')
|
|
|
|
# DTFIX-RELEASE: are there any deprecation calls where the feature is switching from enabled to disabled, rather than being removed entirely?
|
|
# DTFIX-RELEASE: are there deprecated features which should going through deferred deprecation instead?
|
|
|
|
self._deprecated_with_plugin_info(
|
|
msg=msg,
|
|
version=version,
|
|
removed=removed,
|
|
date=date,
|
|
help_text=help_text,
|
|
obj=obj,
|
|
plugin=_plugin_exec_context.PluginExecContext.get_current_plugin_info(),
|
|
)
|
|
|
|
def _deprecated_with_plugin_info(
|
|
self,
|
|
msg: str,
|
|
version: str | None = None,
|
|
removed: bool = False,
|
|
date: str | datetime.date | None = None,
|
|
*,
|
|
help_text: str | None = None,
|
|
obj: t.Any = None,
|
|
plugin: PluginInfo | None = None,
|
|
) -> None:
|
|
"""
|
|
This is the internal pre-proxy half of the `deprecated` implementation.
|
|
Any logic that must occur on workers needs to be implemented here.
|
|
"""
|
|
if removed:
|
|
raise AnsibleError(self._get_deprecation_message_with_plugin_info(
|
|
msg=msg,
|
|
version=version,
|
|
removed=removed,
|
|
date=date,
|
|
plugin=plugin,
|
|
))
|
|
|
|
if source_context := _utils.SourceContext.from_value(obj):
|
|
formatted_source_context = str(source_context)
|
|
else:
|
|
formatted_source_context = None
|
|
|
|
deprecation = DeprecationSummary(
|
|
details=(
|
|
Detail(
|
|
msg=msg,
|
|
formatted_source_context=formatted_source_context,
|
|
help_text=help_text,
|
|
),
|
|
),
|
|
version=version,
|
|
date=str(date) if isinstance(date, datetime.date) else date,
|
|
plugin=plugin,
|
|
formatted_traceback=_traceback.maybe_capture_traceback(_traceback.TracebackEvent.DEPRECATED),
|
|
)
|
|
|
|
if warning_ctx := _DeferredWarningContext.current(optional=True):
|
|
warning_ctx.capture(deprecation)
|
|
return
|
|
|
|
self._deprecated(deprecation)
|
|
|
|
@_proxy
|
|
def _deprecated(self, warning: DeprecationSummary) -> None:
|
|
"""Internal implementation detail, use `deprecated` instead."""
|
|
|
|
# This is the post-proxy half of the `deprecated` implementation.
|
|
# Any logic that must occur in the primary controller process needs to be implemented here.
|
|
|
|
if not _DeferredWarningContext.deprecation_warnings_enabled():
|
|
return
|
|
|
|
self.warning('Deprecation warnings can be disabled by setting `deprecation_warnings=False` in ansible.cfg.')
|
|
|
|
msg = format_message(warning)
|
|
msg = f'[DEPRECATION WARNING]: {msg}'
|
|
|
|
# DTFIX-RELEASE: what should we do with wrap_message?
|
|
msg = self._wrap_message(msg=msg, wrap_text=True)
|
|
|
|
if self._deduplicate(msg, self._deprecations):
|
|
return
|
|
|
|
self.display(msg, color=C.config.get_config_value('COLOR_DEPRECATE'), stderr=True)
|
|
|
|
def warning(
|
|
self,
|
|
msg: str,
|
|
formatted: bool = False,
|
|
*,
|
|
help_text: str | None = None,
|
|
obj: t.Any = None
|
|
) -> None:
|
|
"""Display a warning message."""
|
|
|
|
# This is the pre-proxy half of the `warning` implementation.
|
|
# Any logic that must occur on workers needs to be implemented here.
|
|
|
|
if source_context := _utils.SourceContext.from_value(obj):
|
|
formatted_source_context = str(source_context)
|
|
else:
|
|
formatted_source_context = None
|
|
|
|
warning = WarningSummary(
|
|
details=(
|
|
Detail(
|
|
msg=msg,
|
|
help_text=help_text,
|
|
formatted_source_context=formatted_source_context,
|
|
),
|
|
),
|
|
formatted_traceback=_traceback.maybe_capture_traceback(_traceback.TracebackEvent.WARNING),
|
|
)
|
|
|
|
if warning_ctx := _DeferredWarningContext.current(optional=True):
|
|
warning_ctx.capture(warning)
|
|
# DTFIX-RELEASE: what to do about propagating wrap_text?
|
|
return
|
|
|
|
self._warning(warning, wrap_text=not formatted)
|
|
|
|
@_proxy
|
|
def _warning(self, warning: WarningSummary, wrap_text: bool) -> None:
|
|
"""Internal implementation detail, use `warning` instead."""
|
|
|
|
# This is the post-proxy half of the `warning` implementation.
|
|
# Any logic that must occur in the primary controller process needs to be implemented here.
|
|
|
|
msg = format_message(warning)
|
|
msg = f"[WARNING]: {msg}"
|
|
|
|
if self._deduplicate(msg, self._warns):
|
|
return
|
|
|
|
# DTFIX-RELEASE: what should we do with wrap_message?
|
|
msg = self._wrap_message(msg=msg, wrap_text=wrap_text)
|
|
|
|
self.display(msg, color=C.config.get_config_value('COLOR_WARN'), stderr=True, caplevel=-2)
|
|
|
|
@_proxy
|
|
def system_warning(self, msg: str) -> None:
|
|
if C.SYSTEM_WARNINGS:
|
|
self.warning(msg)
|
|
|
|
@_proxy
|
|
def banner(self, msg: str, color: str | None = None, cows: bool = True) -> None:
|
|
"""
|
|
Prints a header-looking line with cowsay or stars with length depending on terminal width (3 minimum)
|
|
"""
|
|
msg = to_text(msg)
|
|
|
|
if self.b_cowsay and cows:
|
|
try:
|
|
self.banner_cowsay(msg)
|
|
return
|
|
except OSError:
|
|
self.warning("somebody cleverly deleted cowsay or something during the PB run. heh.")
|
|
|
|
msg = msg.strip()
|
|
try:
|
|
star_len = self.columns - get_text_width(msg)
|
|
except EnvironmentError:
|
|
star_len = self.columns - len(msg)
|
|
if star_len <= 3:
|
|
star_len = 3
|
|
stars = u"*" * star_len
|
|
self.display(u"\n%s %s" % (msg, stars), color=color)
|
|
|
|
@_proxy
|
|
def banner_cowsay(self, msg: str, color: str | None = None) -> None:
|
|
if u": [" in msg:
|
|
msg = msg.replace(u"[", u"")
|
|
if msg.endswith(u"]"):
|
|
msg = msg[:-1]
|
|
runcmd = [self.b_cowsay, b"-W", b"60"]
|
|
if self.noncow:
|
|
thecow = self.noncow
|
|
if thecow == 'random':
|
|
thecow = secrets.choice(list(self.cows_available))
|
|
runcmd.append(b'-f')
|
|
runcmd.append(to_bytes(thecow))
|
|
runcmd.append(to_bytes(msg))
|
|
cmd = subprocess.Popen(runcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
(out, err) = cmd.communicate()
|
|
self.display(u"%s\n" % to_text(out), color=color)
|
|
|
|
def error_as_warning(self, msg: str | None, exception: BaseException) -> None:
|
|
"""Display an exception as a warning."""
|
|
|
|
error = _utils._create_error_summary(exception, _traceback.TracebackEvent.WARNING)
|
|
|
|
if msg:
|
|
error = dataclasses.replace(error, details=(Detail(msg=msg),) + error.details)
|
|
|
|
warning = WarningSummary(
|
|
details=error.details,
|
|
formatted_traceback=error.formatted_traceback,
|
|
)
|
|
|
|
if warning_ctx := _DeferredWarningContext.current(optional=True):
|
|
warning_ctx.capture(warning)
|
|
return
|
|
|
|
self._warning(warning, wrap_text=False)
|
|
|
|
def error(self, msg: str | BaseException, wrap_text: bool = True, stderr: bool = True) -> None:
|
|
"""Display an error message."""
|
|
|
|
# This is the pre-proxy half of the `error` implementation.
|
|
# Any logic that must occur on workers needs to be implemented here.
|
|
|
|
if isinstance(msg, BaseException):
|
|
error = _utils._create_error_summary(msg, _traceback.TracebackEvent.ERROR)
|
|
wrap_text = False
|
|
else:
|
|
error = ErrorSummary(details=(Detail(msg=msg),), formatted_traceback=_traceback.maybe_capture_traceback(_traceback.TracebackEvent.ERROR))
|
|
|
|
self._error(error, wrap_text=wrap_text, stderr=stderr)
|
|
|
|
@_proxy
|
|
def _error(self, error: ErrorSummary, wrap_text: bool, stderr: bool) -> None:
|
|
"""Internal implementation detail, use `error` instead."""
|
|
|
|
# This is the post-proxy half of the `error` implementation.
|
|
# Any logic that must occur in the primary controller process needs to be implemented here.
|
|
|
|
msg = format_message(error)
|
|
msg = f'[ERROR]: {msg}'
|
|
|
|
if self._deduplicate(msg, self._errors):
|
|
return
|
|
|
|
# DTFIX-RELEASE: what should we do with wrap_message?
|
|
msg = self._wrap_message(msg=msg, wrap_text=wrap_text)
|
|
|
|
self.display(msg, color=C.config.get_config_value('COLOR_ERROR'), stderr=stderr, caplevel=-1)
|
|
|
|
@staticmethod
|
|
def prompt(msg: str, private: bool = False) -> str:
|
|
if private:
|
|
return getpass.getpass(msg)
|
|
else:
|
|
return input(msg)
|
|
|
|
def do_var_prompt(
|
|
self,
|
|
varname: str,
|
|
private: bool = True,
|
|
prompt: str | None = None,
|
|
encrypt: str | None = None,
|
|
confirm: bool = False,
|
|
salt_size: int | None = None,
|
|
salt: str | None = None,
|
|
default: str | None = None,
|
|
unsafe: bool = False,
|
|
) -> str:
|
|
result = None
|
|
if sys.__stdin__.isatty():
|
|
|
|
do_prompt = self.prompt
|
|
|
|
if prompt and default is not None:
|
|
msg = "%s [%s]: " % (prompt, default)
|
|
elif prompt:
|
|
msg = "%s: " % prompt
|
|
else:
|
|
msg = 'input for %s: ' % varname
|
|
|
|
if confirm:
|
|
while True:
|
|
result = do_prompt(msg, private)
|
|
second = do_prompt("confirm " + msg, private)
|
|
if result == second:
|
|
break
|
|
self.display("***** VALUES ENTERED DO NOT MATCH ****")
|
|
else:
|
|
result = do_prompt(msg, private)
|
|
else:
|
|
result = None
|
|
self.warning("Not prompting as we are not in interactive mode")
|
|
|
|
# if result is false and default is not None
|
|
if not result and default is not None:
|
|
result = default
|
|
|
|
if encrypt:
|
|
# Circular import because encrypt needs a display class
|
|
from ansible.utils.encrypt import do_encrypt
|
|
result = do_encrypt(result, encrypt, salt_size=salt_size, salt=salt)
|
|
|
|
# handle utf-8 chars
|
|
result = to_text(result, errors='surrogate_or_strict')
|
|
|
|
if not unsafe:
|
|
# to maintain backward compatibility, assume these values are safe to template
|
|
result = TrustedAsTemplate().tag(result)
|
|
|
|
return result
|
|
|
|
def _set_column_width(self) -> None:
|
|
if os.isatty(1):
|
|
tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
|
|
else:
|
|
tty_size = 0
|
|
self.columns = max(79, tty_size - 1)
|
|
|
|
def prompt_until(
|
|
self,
|
|
msg: str,
|
|
private: bool = False,
|
|
seconds: int | None = None,
|
|
interrupt_input: c.Container[bytes] | None = None,
|
|
complete_input: c.Container[bytes] | None = None,
|
|
) -> bytes:
|
|
if self._final_q:
|
|
from ansible.executor.process.worker import current_worker
|
|
self._final_q.send_prompt(
|
|
worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds,
|
|
interrupt_input=interrupt_input, complete_input=complete_input
|
|
)
|
|
return current_worker.worker_queue.get()
|
|
|
|
if HAS_CURSES and not self.setup_curses:
|
|
setupterm()
|
|
self.setup_curses = True
|
|
|
|
if (
|
|
self._stdin_fd is None
|
|
or not os.isatty(self._stdin_fd)
|
|
# Compare the current process group to the process group associated
|
|
# with terminal of the given file descriptor to determine if the process
|
|
# is running in the background.
|
|
or os.getpgrp() != os.tcgetpgrp(self._stdin_fd)
|
|
):
|
|
raise AnsiblePromptNoninteractive('stdin is not interactive')
|
|
|
|
# When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass,
|
|
# but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread.
|
|
# If the main thread handled this, it would also need to send a newline to the tty of any hanging pids.
|
|
# if seconds is None and interrupt_input is None and complete_input is None:
|
|
# try:
|
|
# return self.prompt(msg, private=private)
|
|
# except KeyboardInterrupt:
|
|
# # can't catch in the results_thread_main daemon thread
|
|
# raise AnsiblePromptInterrupt('user interrupt')
|
|
|
|
self.display(msg)
|
|
result = b''
|
|
with self._lock:
|
|
original_stdin_settings = termios.tcgetattr(self._stdin_fd)
|
|
try:
|
|
setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private)
|
|
|
|
# flush the buffer to make sure no previous key presses
|
|
# are read in below
|
|
termios.tcflush(self._stdin, termios.TCIFLUSH)
|
|
|
|
# read input 1 char at a time until the optional timeout or complete/interrupt condition is met
|
|
return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input)
|
|
finally:
|
|
# restore the old settings for the duped stdin stdin_fd
|
|
termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings)
|
|
|
|
def _read_non_blocking_stdin(
|
|
self,
|
|
echo: bool = False,
|
|
seconds: int | None = None,
|
|
interrupt_input: c.Container[bytes] | None = None,
|
|
complete_input: c.Container[bytes] | None = None,
|
|
) -> bytes:
|
|
if self._final_q:
|
|
raise NotImplementedError
|
|
|
|
if seconds is not None:
|
|
start = time.time()
|
|
if interrupt_input is None:
|
|
try:
|
|
interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR]
|
|
except Exception:
|
|
interrupt = b'\x03' # value for Ctrl+C
|
|
|
|
try:
|
|
backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]]
|
|
except Exception:
|
|
# unsupported/not present, use default
|
|
backspace_sequences = [b'\x7f', b'\x08']
|
|
|
|
result_string = b''
|
|
while seconds is None or (time.time() - start < seconds):
|
|
key_pressed = None
|
|
try:
|
|
os.set_blocking(self._stdin_fd, False)
|
|
while key_pressed is None and (seconds is None or (time.time() - start < seconds)):
|
|
key_pressed = self._stdin.read(1)
|
|
# throttle to prevent excess CPU consumption
|
|
time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
|
|
finally:
|
|
os.set_blocking(self._stdin_fd, True)
|
|
if key_pressed is None:
|
|
key_pressed = b''
|
|
|
|
if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input):
|
|
clear_line(self._stdout)
|
|
raise AnsiblePromptInterrupt('user interrupt')
|
|
if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input):
|
|
clear_line(self._stdout)
|
|
break
|
|
elif key_pressed in backspace_sequences:
|
|
clear_line(self._stdout)
|
|
result_string = result_string[:-1]
|
|
if echo:
|
|
self._stdout.write(result_string)
|
|
self._stdout.flush()
|
|
else:
|
|
result_string += key_pressed
|
|
return result_string
|
|
|
|
@property
|
|
def _stdin(self) -> t.BinaryIO | None:
|
|
if self._final_q:
|
|
raise NotImplementedError
|
|
try:
|
|
return sys.stdin.buffer
|
|
except AttributeError:
|
|
return None
|
|
|
|
@property
|
|
def _stdin_fd(self) -> int | None:
|
|
try:
|
|
return self._stdin.fileno()
|
|
except (ValueError, AttributeError):
|
|
return None
|
|
|
|
@property
|
|
def _stdout(self) -> t.BinaryIO:
|
|
if self._final_q:
|
|
raise NotImplementedError
|
|
return sys.stdout.buffer
|
|
|
|
@property
|
|
def _stdout_fd(self) -> int | None:
|
|
try:
|
|
return self._stdout.fileno()
|
|
except (ValueError, AttributeError):
|
|
return None
|
|
|
|
|
|
_display = Display()
|
|
|
|
|
|
class _DeferredWarningContext(_ambient_context.AmbientContextBase):
|
|
"""
|
|
Calls to `Display.warning()` and `Display.deprecated()` within this context will cause the resulting warnings to be captured and not displayed.
|
|
The intended use is for task-initiated warnings to be recorded with the task result, which makes them visible to registered results, callbacks, etc.
|
|
The active display callback is responsible for communicating any warnings to the user.
|
|
"""
|
|
|
|
# DTFIX-FUTURE: once we start implementing nested scoped contexts for our own bookkeeping, this should be an interface facade that forwards to the nearest
|
|
# context that actually implements the warnings collection capability
|
|
|
|
def __init__(self, *, variables: dict[str, object]) -> None:
|
|
self._variables = variables # DTFIX-FUTURE: move this to an AmbientContext-derived TaskContext (once it exists)
|
|
self._deprecation_warnings: list[DeprecationSummary] = []
|
|
self._warnings: list[WarningSummary] = []
|
|
self._seen: set[WarningSummary] = set()
|
|
|
|
@classmethod
|
|
def deprecation_warnings_enabled(cls) -> bool:
|
|
"""Return True if deprecation warnings are enabled for the current calling context, otherwise False."""
|
|
# DTFIX-FUTURE: move this capability into config using an AmbientContext-derived TaskContext (once it exists)
|
|
if warning_ctx := cls.current(optional=True):
|
|
variables = warning_ctx._variables
|
|
else:
|
|
variables = None
|
|
|
|
return C.config.get_config_value('DEPRECATION_WARNINGS', variables=variables)
|
|
|
|
def capture(self, warning: WarningSummary) -> None:
|
|
"""Add the warning/deprecation to the context if it has not already been seen by this context."""
|
|
if warning in self._seen:
|
|
return
|
|
|
|
self._seen.add(warning)
|
|
|
|
if isinstance(warning, DeprecationSummary):
|
|
self._deprecation_warnings.append(warning)
|
|
else:
|
|
self._warnings.append(warning)
|
|
|
|
def get_warnings(self) -> list[WarningSummary]:
|
|
"""Return a list of the captured non-deprecation warnings."""
|
|
# DTFIX-FUTURE: return a read-only list proxy instead
|
|
return self._warnings
|
|
|
|
def get_deprecation_warnings(self) -> list[DeprecationSummary]:
|
|
"""Return a list of the captured deprecation warnings."""
|
|
# DTFIX-FUTURE: return a read-only list proxy instead
|
|
return self._deprecation_warnings
|
|
|
|
|
|
def _format_error_details(details: t.Sequence[Detail], formatted_tb: str | None = None) -> str:
|
|
details = _utils._collapse_error_details(details)
|
|
|
|
message_lines: list[str] = []
|
|
|
|
if len(details) > 1:
|
|
message_lines.append(_utils._dedupe_and_concat_message_chain([md.msg for md in details]))
|
|
message_lines.append('')
|
|
|
|
for idx, edc in enumerate(details):
|
|
if idx:
|
|
message_lines.extend((
|
|
'',
|
|
'<<< caused by >>>',
|
|
'',
|
|
))
|
|
|
|
message_lines.extend(_get_message_lines(edc.msg, edc.help_text, edc.formatted_source_context))
|
|
|
|
message_lines = [f'{line}\n' for line in message_lines]
|
|
|
|
if formatted_tb:
|
|
message_lines.append('\n')
|
|
message_lines.append(formatted_tb)
|
|
|
|
msg = "".join(message_lines).strip()
|
|
|
|
if '\n' in msg:
|
|
msg += '\n\n'
|
|
else:
|
|
msg += '\n'
|
|
|
|
return msg
|
|
|
|
|
|
def _get_message_lines(message: str, help_text: str | None, formatted_source_context: str | None) -> list[str]:
|
|
"""Return a list of error/warning message lines constructed from the given message, help text and source context."""
|
|
|
|
if help_text and not formatted_source_context and '\n' not in message and '\n' not in help_text:
|
|
return [f'{message} {help_text}'] # prefer a single-line message with help text when there is no source context
|
|
|
|
message_lines = [message]
|
|
|
|
if formatted_source_context:
|
|
message_lines.append(formatted_source_context)
|
|
|
|
if help_text:
|
|
message_lines.append('')
|
|
message_lines.append(help_text)
|
|
|
|
return message_lines
|
|
|
|
|
|
def format_message(summary: SummaryBase) -> str:
|
|
details: t.Sequence[Detail]
|
|
|
|
if isinstance(summary, DeprecationSummary):
|
|
details = [detail if idx else dataclasses.replace(
|
|
detail,
|
|
msg=_display._get_deprecation_message_with_plugin_info(
|
|
msg=detail.msg,
|
|
version=summary.version,
|
|
date=summary.date,
|
|
plugin=summary.plugin,
|
|
),
|
|
) for idx, detail in enumerate(summary.details)]
|
|
else:
|
|
details = summary.details
|
|
|
|
return _format_error_details(details, summary.formatted_traceback)
|