Brian Coca 2 weeks ago committed by GitHub
commit 545646d301
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -48,6 +48,7 @@ class AdHocCLI(CLI):
opt_help.add_module_options(self.parser)
opt_help.add_basedir_options(self.parser)
opt_help.add_tasknoplay_options(self.parser)
opt_help.add_live_options(self.parser)
# options unique to ansible ad-hoc
self.parser.add_argument('-a', '--args', dest='module_args',

@ -396,3 +396,8 @@ def add_vault_options(parser):
help='ask for vault password')
base_group.add_argument('--vault-password-file', '--vault-pass-file', default=[], dest='vault_password_files',
help="vault password file", type=unfrack_path(follow=False), action='append')
def add_live_options(parser):
parser.add_argument('--live', default=False, dest='live', action='store_true',
help="Requests and displays 'live updates' from executing actions and connections that support it.")

@ -115,6 +115,7 @@ class ConsoleCLI(CLI, cmd.Cmd):
opt_help.add_basedir_options(self.parser)
opt_help.add_runtask_options(self.parser)
opt_help.add_tasknoplay_options(self.parser)
opt_help.add_live_options(self.parser)
# options unique to shell
self.parser.add_argument('pattern', help='host pattern', metavar='pattern', default='all', nargs='?')
@ -382,8 +383,18 @@ class ConsoleCLI(CLI, cmd.Cmd):
else:
display.display("Please specify become value, e.g. `become yes`")
<<<<<<< HEAD
def help_become(self):
display.display("Toggle whether the tasks are run with become")
=======
def do_live(self, arg):
"""Toggle whether plays run with become"""
if arg:
self.options.live = boolean(arg, strict=False)
display.v("live changed to %s" % self.options.live)
else:
display.display("Please specify live value, e.g. `live yes`")
>>>>>>> f945216a43 (Added ability to modules to emit 'update messsages')
def do_remote_user(self, arg):
"""Given a username, set the remote user plays are run by"""

@ -1069,7 +1069,7 @@ def _add_module_to_zip(zf, date_time, remote_module_fqn, b_module_data):
def _find_module_utils(module_name, b_module_data, module_path, module_args, task_vars, templar, module_compression, async_timeout, become,
become_method, become_user, become_password, become_flags, environment, remote_is_local=False):
become_method, become_user, become_password, become_flags, environment, remote_is_local=False, live_updates=False):
"""
Given the source of the module, convert it to a Jinja2 template to insert
module code and return whether it's a new or old style module.
@ -1275,6 +1275,7 @@ def _find_module_utils(module_name, b_module_data, module_path, module_args, tas
date_time=date_time,
coverage=coverage,
rlimit=rlimit,
live_updates=live_updates,
)))
b_module_data = output.getvalue()
@ -1338,7 +1339,7 @@ def _extract_interpreter(b_module_data):
def modify_module(module_name, module_path, module_args, templar, task_vars=None, module_compression='ZIP_STORED', async_timeout=0, become=False,
become_method=None, become_user=None, become_password=None, become_flags=None, environment=None, remote_is_local=False):
become_method=None, become_user=None, become_password=None, become_flags=None, environment=None, remote_is_local=False, live_updates=False):
"""
Used to insert chunks of code into modules before transfer rather than
doing regular python imports. This allows for more efficient transfer in
@ -1370,7 +1371,7 @@ def modify_module(module_name, module_path, module_args, templar, task_vars=None
(b_module_data, module_style, shebang) = _find_module_utils(module_name, b_module_data, module_path, module_args, task_vars, templar, module_compression,
async_timeout=async_timeout, become=become, become_method=become_method,
become_user=become_user, become_password=become_password, become_flags=become_flags,
environment=environment, remote_is_local=remote_is_local)
environment=environment, remote_is_local=remote_is_local, live_updates=live_updates)
if module_style == 'binary':
return (b_module_data, module_style, to_text(shebang, nonstring='passthru'))

@ -39,6 +39,7 @@ handlers: "A section with tasks that are treated as handlers, these won't get ex
hosts: "A list of groups, hosts or host pattern that translates into a list of hosts that are the play's target."
ignore_errors: Boolean that allows you to ignore task failures and continue with play. It does not affect connection errors.
ignore_unreachable: Boolean that allows you to ignore task failures due to an unreachable host and continue with the play. This does not affect other task errors (see :term:`ignore_errors`) but is useful for groups of volatile/ephemeral hosts.
live: "Signals that a task should return 'live updates' if the module supports it"
loop: "Takes a list for the task to iterate over, saving each list element into the ``item`` variable (configurable via loop_control)"
loop_control: Several keys here allow you to modify/set loop behavior in a task. See :ref:`loop_control`.
max_fail_percentage: can be used to abort the run after a given percentage of hosts in the current batch has failed. This only works on linear or linear-derived strategies.

@ -371,6 +371,7 @@ class AnsibleModule(object):
and :ref:`developing_program_flow_modules` for more detailed explanation.
'''
self._updates = 0
self._name = os.path.basename(__file__) # initialize name until we can parse from options
self.argument_spec = argument_spec
self.supports_check_mode = supports_check_mode
@ -390,6 +391,7 @@ class AnsibleModule(object):
self._shell = None
self._syslog_facility = 'LOG_USER'
self._verbosity = 0
self._provide_updates = False
# May be used to set modifications to the environment for any
# run_command invocation
self.run_command_environ_update = {}
@ -1295,6 +1297,10 @@ class AnsibleModule(object):
else:
self._log_to_syslog(journal_msg)
# TODO: does 'return log data' need diff setting?
if self._debug and syslog_msg:
self.update_json({'log': syslog_msg})
def _log_invocation(self):
''' log that ansible ran the module '''
# TODO: generalize a separate log function and make log_invocation use it
@ -1402,15 +1408,10 @@ class AnsibleModule(object):
self.add_path_info(kwargs)
if 'invocation' not in kwargs:
kwargs['invocation'] = {'module_args': self.params}
if '_ansible_update' not in kwargs:
if 'warnings' in kwargs:
if isinstance(kwargs['warnings'], list):
for w in kwargs['warnings']:
self.warn(w)
else:
self.warn(kwargs['warnings'])
if 'invocation' not in kwargs:
kwargs['invocation'] = {'module_args': self.params}
warnings = get_warning_messages()
if warnings:
@ -1447,6 +1448,7 @@ class AnsibleModule(object):
kwargs.update(preserved)
print('\n%s' % self.jsonify(kwargs))
sys.stdout.flush()
def exit_json(self, **kwargs):
''' return from the module, without error '''
@ -1470,6 +1472,14 @@ class AnsibleModule(object):
self._return_formatted(kwargs)
sys.exit(1)
def update_json(self, data):
if self._provide_updates:
self._updates += 1
data['_ansible_update'] = self._updates
self._return_formatted(data)
self.log('update #%s' % self._updates)
def fail_on_missing_params(self, required_params=None):
if not required_params:
return
@ -1717,6 +1727,14 @@ class AnsibleModule(object):
self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, to_native(e)),
exception=traceback.format_exc())
def _read_from_pipes(self, rpipes, rfds, file_descriptor):
data = b('')
if file_descriptor in rfds:
data = os.read(file_descriptor.fileno(), 9000)
if data == b(''):
rpipes.remove(file_descriptor)
return data
def _clean_args(self, args):
if not self._clean:
@ -1925,6 +1943,10 @@ class AnsibleModule(object):
try:
if self._debug:
self.log('Executing: ' + self._clean_args(args))
if self._provide_updates:
kwargs['bufsize'] = 0
cmd = subprocess.Popen(args, **kwargs)
if before_communicate_callback:
before_communicate_callback(cmd)

@ -89,6 +89,7 @@ PASS_VARS = {
'diff': ('_diff', False),
'keep_remote_files': ('_keep_remote_files', False),
'ignore_unknown_opts': ('_ignore_unknown_opts', False),
'live_updates': ('live', None),
'module_name': ('_name', None),
'no_log': ('no_log', False),
'remote_tmp': ('_remote_tmp', None),
@ -103,7 +104,7 @@ PASS_VARS = {
'version': ('ansible_version', '0.0'),
}
PASS_BOOLS = ('check_mode', 'debug', 'diff', 'keep_remote_files', 'ignore_unknown_opts', 'no_log')
PASS_BOOLS = ('check_mode', 'debug', 'diff', 'keep_remote_files', 'ignore_unknown_opts', 'live', 'no_log')
DEFAULT_TYPE_VALIDATORS = {
'str': check_type_str,

@ -31,7 +31,7 @@ import json # pylint: disable=unused-import
# NB: a copy of this function exists in ../../modules/core/async_wrapper.py. Ensure any
# changes are propagated there.
def _filter_non_json_lines(data, objects_only=False):
def _consume_json(data, objects_only=False):
'''
Used to filter unrelated output around module JSON output, like messages from
tcagetattr, or where dropbear spews MOTD on every single command (which is nuts).
@ -64,15 +64,34 @@ def _filter_non_json_lines(data, objects_only=False):
else:
raise ValueError('No end of json char found')
trailing_junk = ''
if reverse_end_offset > 0:
# Trailing junk is uncommon and can point to things the user might
# want to change. So print a warning if we find any
trailing_junk = lines[len(lines) - reverse_end_offset:]
for line in trailing_junk:
if line.strip():
warnings.append('Module invocation had junk after the JSON data: %s' % '\n'.join(trailing_junk))
break
lines = lines[:(len(lines) - reverse_end_offset)]
return ('\n'.join(lines), trailing_junk)
return ('\n'.join(lines), warnings)
# NB: a copy of this function exists in ../../modules/core/async_wrapper.py.
# Ensure any changes are propagated there.
def _filter_non_json_lines(data):
'''
Used to filter unrelated output around module JSON output, like messages from
tcagetattr, or where dropbear spews MOTD on every single command (which is nuts).
Filters leading lines before first line-starting occurrence of '{' or '[', and filter all
trailing lines after matching close character (working from the bottom of output).
'''
warnings = []
lines, trailing_junk = _consume_json(data)
# Trailing junk is uncommon and can point to things the user might want to change.
# So print a warning if we find any 'non whitespace' junk
for line in trailing_junk:
if line.strip():
warnings.append('Module invocation had junk after the JSON data: %s' % '\n'.join(trailing_junk))
break
return (lines, warnings)

@ -722,6 +722,7 @@ class Base(FieldAttributeBase):
any_errors_fatal = FieldAttribute(isa='bool', default=C.ANY_ERRORS_FATAL)
throttle = FieldAttribute(isa='int', default=0)
timeout = FieldAttribute(isa='int', default=C.TASK_TIMEOUT)
live = FieldAttribute(isa='bool')
# explicitly invoke a debugger on tasks
debugger = FieldAttribute(isa='string')

@ -77,6 +77,23 @@ class AnsiblePlugin(ABC):
raise KeyError(to_native(e))
return option_value, origin
@staticmethod
def warn(msg):
# FIXME: move from display to use callback queue
display.warning(msg)
@staticmethod
def deprecated(msg, version=None, removed=False):
display.deprecated(msg, version, removed)
@staticmethod
def debug(msg):
display.debug(msg)
@staticmethod
def info(msg, host=None, level=1):
display.verbose(msg, host=host, caplevel=level)
def get_option(self, option, hostvars=None):
if option not in self._options:

@ -23,7 +23,7 @@ from ansible.executor.module_common import modify_module
from ansible.executor.interpreter_discovery import discover_interpreter, InterpreterDiscoveryRequiredError
from ansible.module_utils.common.arg_spec import ArgumentSpecValidator
from ansible.module_utils.errors import UnsupportedError
from ansible.module_utils.json_utils import _filter_non_json_lines
from ansible.module_utils.json_utils import _consume_json
from ansible.module_utils.six import binary_type, string_types, text_type
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.parsing.utils.jsonify import jsonify
@ -300,6 +300,7 @@ class ActionBase(ABC):
async_timeout=self._task.async_val,
environment=final_environment,
remote_is_local=bool(getattr(self._connection, '_remote_is_local', False)),
live_updates=bool(self._task.live),
**become_kwargs)
break
except InterpreterDiscoveryRequiredError as idre:
@ -944,6 +945,7 @@ class ActionBase(ABC):
if not self._supports_check_mode:
raise AnsibleError("check mode is not supported for this operation")
module_args['_ansible_check_mode'] = True
else:
module_args['_ansible_check_mode'] = False
@ -995,6 +997,9 @@ class ActionBase(ABC):
# make sure the remote_tmp value is sent through in case modules needs to create their own
module_args['_ansible_remote_tmp'] = self.get_shell_option('remote_tmp', default='~/.ansible/tmp')
# enable live updates
module_args['_ansible_live_updates'] = self._task.live
# tells the module to ignore options that are not in its argspec.
module_args['_ansible_ignore_unknown_opts'] = ignore_unknown_opts
@ -1153,7 +1158,7 @@ class ActionBase(ABC):
self._fixup_perms2(remote_files, self._get_remote_user())
# actually execute
res = self._low_level_execute_command(cmd, sudoable=sudoable, in_data=in_data)
res = self._low_level_execute_command(cmd, sudoable=sudoable, in_data=in_data, live=bool(self._task.live))
# parse the main result
data = self._parse_returned_data(res)
@ -1216,7 +1221,7 @@ class ActionBase(ABC):
def _parse_returned_data(self, res):
try:
filtered_output, warnings = _filter_non_json_lines(res.get('stdout', u''), objects_only=True)
filtered_output, warnings = _consume_json(res.get('stdout', u''), objects_only=True)
for w in warnings:
display.warning(w)
@ -1265,7 +1270,7 @@ class ActionBase(ABC):
return data
# FIXME: move to connection base
def _low_level_execute_command(self, cmd, sudoable=True, in_data=None, executable=None, encoding_errors='surrogate_then_replace', chdir=None):
def _low_level_execute_command(self, cmd, sudoable=True, in_data=None, executable=None, encoding_errors='surrogate_then_replace', chdir=None, live=False):
'''
This is the function which executes the low level shell command, which
may be commands to create/remove directories for temporary files, or to

@ -7,6 +7,8 @@ from __future__ import annotations
import collections.abc as c
import fcntl
import io
import gettext
import json
import os
import shlex
import typing as t
@ -17,6 +19,9 @@ from functools import wraps
from ansible import constants as C
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.playbook.play_context import PlayContext
from ansible.errors import AnsibleError
from ansible.module_utils.json_utils import _filter_non_json_lines
from ansible.module_utils.six import string_types
from ansible.plugins import AnsiblePlugin
from ansible.plugins.become import BecomeBase
from ansible.plugins.shell import ShellBase
@ -243,6 +248,25 @@ class ConnectionBase(AnsiblePlugin):
fcntl.lockf(f, fcntl.LOCK_UN)
display.vvvv('CONNECTION: pid %d released lock on %d' % (os.getpid(), f), host=self._play_context.remote_addr)
def _handle_updates(self, data):
is_update = False
rest_data = ''
if b'_ansible_update' in data:
# consume update data
try:
updates, rest_data = _consume_json(to_text(data))
except ValueError:
# improper json, incomplete, ignore try later
updates = None
if updates and updates.strip():
is_update = True
# TODO: once callbacks are expanded use those instead of restricting
display.display('[U]: <%s> %s' % (self._play_context.remote_addr, updates), color=C.COLOR_DEPRECATE, screen_only=True)
return is_update, to_bytes(rest_data)
def reset(self) -> None:
display.warning("Reset is not implemented for this connection")

@ -65,7 +65,7 @@ class Connection(ConnectionBase):
self._connected = True
return self
def exec_command(self, cmd: str, in_data: bytes | None = None, sudoable: bool = True) -> tuple[int, bytes, bytes]:
def exec_command(self, cmd: str, in_data: bytes | None = None, sudoable: bool = True, live: bool = False) -> tuple[int, bytes, bytes]:
''' run a command on the local host '''
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
@ -113,13 +113,13 @@ class Connection(ConnectionBase):
os.close(stdin)
display.debug("done running command with Popen()")
selector = selectors.DefaultSelector()
selector.register(p.stdout, selectors.EVENT_READ)
selector.register(p.stderr, selectors.EVENT_READ)
if self.become and self.become.expect_prompt() and sudoable:
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK)
fcntl.fcntl(p.stderr, fcntl.F_SETFL, fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK)
selector = selectors.DefaultSelector()
selector.register(p.stdout, selectors.EVENT_READ)
selector.register(p.stderr, selectors.EVENT_READ)
become_output = b''
try:
@ -139,8 +139,9 @@ class Connection(ConnectionBase):
stdout, stderr = p.communicate()
raise AnsibleError('privilege output closed while waiting for password prompt:\n' + to_native(become_output))
become_output += chunk
finally:
except:
selector.close()
raise
if not self.become.check_success(become_output):
become_pass = self.become.get_option('become_pass', playcontext=self._play_context)
@ -152,8 +153,39 @@ class Connection(ConnectionBase):
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) & ~os.O_NONBLOCK)
fcntl.fcntl(p.stderr, fcntl.F_SETFL, fcntl.fcntl(p.stderr, fcntl.F_GETFL) & ~os.O_NONBLOCK)
display.debug("getting output with communicate()")
stdout, stderr = p.communicate(in_data)
display.debug("starting update loop")
stdout = b''
stdout_done = False
stderr = b''
stderr_done = False
while True:
events = selector.select(1)
for key, event in events:
output = ''
if key.fileobj == p.stdout:
output = os.read(p.stdout.fileno(), 9000)
if output == b'':
stdout_done = True
selector.unregister(p.stdout)
stdout += output
elif key.fileobj == p.stderr:
output = os.read(p.stderr.fileno(), 9000)
if output == b'':
stderr_done = True
selector.unregister(p.stderr)
stderr += output
is_update, rest = self._handle_updates(output)
if is_update:
stdout = rest or b''
# Exit if the process has closed both fds and the process has
# finished
if stdout_done and stderr_done and p.poll() is not None:
break
display.debug("done communicating")
# finally, close the other half of the pty, if it was created

@ -906,7 +906,7 @@ class Connection(ConnectionBase):
return b''.join(output), remainder
def _bare_run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True) -> tuple[int, bytes, bytes]:
def _bare_run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True, live: bool = False) -> tuple[int, bytes, bytes]:
'''
Starts the command and communicates with it until it ends.
'''
@ -929,15 +929,20 @@ class Connection(ConnectionBase):
conn_password = self.get_option('password') or self._play_context.password
if live:
bufsize = 0
else:
bufsize = -1
if not in_data:
try:
# Make sure stdin is a proper pty to avoid tcgetattr errors
master, slave = pty.openpty()
if PY3 and conn_password:
# pylint: disable=unexpected-keyword-arg
p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, pass_fds=self.sshpass_pipe)
p = subprocess.Popen(cmd, bufsize=bufsize, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, pass_fds=self.sshpass_pipe)
else:
p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p = subprocess.Popen(cmd, bufsize=bufsize, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdin = os.fdopen(master, 'wb', 0)
os.close(slave)
except (OSError, IOError):
@ -1076,10 +1081,14 @@ class Connection(ConnectionBase):
b_tmp_stderr += b_chunk
display.debug("stderr chunk (state=%s):\n>>>%s<<<\n" % (state, to_text(b_chunk)))
# check if this was an update message, return remainder if it is
is_update, rest = self._handle_updates(b_tmp_stdout)
if is_update:
b_tmp_stdout = rest or b''
# We examine the output line-by-line until we have negotiated any
# privilege escalation prompt and subsequent success/error message.
# Afterwards, we can accumulate output without looking at it.
if state < states.index('ready_to_send'):
if b_tmp_stdout:
b_output, b_unprocessed = self._examine_output('stdout', states[state], b_tmp_stdout, sudoable)
@ -1202,10 +1211,10 @@ class Connection(ConnectionBase):
return (p.returncode, b_stdout, b_stderr)
@_ssh_retry
def _run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True) -> tuple[int, bytes, bytes]:
def _run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True, live: bool = False) -> tuple[int, bytes, bytes]:
"""Wrapper around _bare_run that retries the connection
"""
return self._bare_run(cmd, in_data, sudoable=sudoable, checkrc=checkrc)
return self._bare_run(cmd, in_data, sudoable=sudoable, checkrc=checkrc, live=live)
@_ssh_retry
def _file_transport_command(self, in_path: str, out_path: str, sftp_action: str) -> tuple[int, bytes, bytes]:
@ -1291,7 +1300,7 @@ class Connection(ConnectionBase):
#
# Main public methods
#
def exec_command(self, cmd: str, in_data: bytes | None = None, sudoable: bool = True) -> tuple[int, bytes, bytes]:
def exec_command(self, cmd: str, in_data: bytes | None = None, sudoable: bool = True, live: bool = False) -> tuple[int, bytes, bytes]:
''' run a command on the remote host '''
super(Connection, self).exec_command(cmd, in_data=in_data, sudoable=sudoable)
@ -1332,7 +1341,7 @@ class Connection(ConnectionBase):
args = (self.host, cmd)
cmd = self._build_command(ssh_executable, 'ssh', *args)
(returncode, stdout, stderr) = self._run(cmd, in_data, sudoable=sudoable)
(returncode, stdout, stderr) = self._run(cmd, in_data, sudoable=sudoable, live=live)
# When running on Windows, stderr may contain CLIXML encoded output
if getattr(self._shell, "_IS_WINDOWS", False) and stderr.startswith(b"#< CLIXML"):

@ -662,6 +662,7 @@ class TestActionBase(unittest.TestCase):
mock_task.diff = False
mock_task.check_mode = False
mock_task.no_log = False
mock_task.live = False
# create a mock connection, so we don't actually try and connect to things
def get_option(option):

Loading…
Cancel
Save