Fix local connection and become issues (#84700)

* Fixed various become-related issues in `local` connection plugin.
* Fixed various issues in `sudo` and `su` become plugins.
* Added unit and integration test coverage.

Co-authored-by: Matt Clay <matt@mystile.com>
Co-authored-by: Matt Davis <nitzmahone@redhat.com>
pull/84727/head
Matt Davis 10 months ago committed by GitHub
parent a4d4315d37
commit 5d7b8288f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,22 @@
minor_changes:
- local connection plugin - A new ``become_success_timeout`` operation-wide timeout config (default 10s) was added for ``become``.
- local connection plugin - A new ``become_strip_preamble`` config option (default True) was added; disable to preserve diagnostic ``become`` output in task results.
- local connection plugin - When a ``become`` plugin's ``prompt`` value is a non-string after the ``check_password_prompt`` callback has completed, no prompt stripping will occur on stderr.
bugfixes:
- local connection plugin - Fixed silent ignore of ``become`` failures and loss of task output when data arrived concurrently on stdout and stderr during ``become`` operation validation.
- local connection plugin - Fixed hang or spurious failure when data arrived concurrently on stdout and stderr during a successful ``become`` operation validation.
- local connection plugin - Fixed task output header truncation when post-become data arrived before ``become`` operation validation had completed.
- local connection plugin - Ensure ``become`` success validation always occurs, even when an active plugin does not set ``prompt``.
- local connection plugin - Fixed cases where the internal ``BECOME-SUCCESS`` message appeared in task output.
- local connection plugin - Fixed long timeout/hang for ``become`` plugins that repeat their prompt on failure (e.g., ``sudo``, some ``su`` implementations).
- local connection plugin - Fixed hang when an active become plugin incorrectly signals lack of prompt.
- local connection plugin - Fixed hang when a become plugin expects a prompt but a password was not provided.
- local connection plugin - Fixed hang when an internal become read timeout expired before the password prompt was written.
- local connection plugin - Fixed hang when only one of stdout or stderr was closed by the ``become_exe`` subprocess.
- local connection plugin - Become timeout errors now include all received data. Previously, the most recently-received data was discarded.
- sudo become plugin - The `sudo_chdir` config option allows the current directory to be set to the specified value before executing sudo to avoid permission errors when dropping privileges.
- su become plugin - Ensure generated regex from ``prompt_l10n`` config values is properly escaped.
- su become plugin - Ensure that password prompts are correctly detected in the presence of leading output. Previously, this case resulted in a timeout or hang.
- su become plugin - Ensure that trailing colon is expected on all ``prompt_l10n`` config values.
- ansible-test - Managed macOS instances now use the ``sudo_chdir`` option for the ``sudo`` become plugin to avoid permission errors when dropping privileges.

@ -3,6 +3,7 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import re
import shlex
from abc import abstractmethod
@ -13,6 +14,7 @@ from gettext import dgettext
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_bytes
from ansible.plugins import AnsiblePlugin
from ansible.utils import display as _display
def _gen_id(length=32):
@ -53,11 +55,11 @@ class BecomeBase(AnsiblePlugin):
return getattr(playcontext, option, None)
def expect_prompt(self):
def expect_prompt(self) -> bool:
"""This function assists connection plugins in determining if they need to wait for
a prompt. Both a prompt and a password are required.
"""
return self.prompt and self.get_option('become_pass')
return bool(self.prompt and self.get_option('become_pass'))
def _build_success_command(self, cmd, shell, noexe=False):
if not all((cmd, shell, self.success)):
@ -65,9 +67,8 @@ class BecomeBase(AnsiblePlugin):
try:
cmd = shlex.quote('%s %s %s %s' % (shell.ECHO, self.success, shell.COMMAND_SEP, cmd))
except AttributeError:
# TODO: This should probably become some more robust functionality used to detect incompat
raise AnsibleError('The %s shell family is incompatible with the %s become plugin' % (shell.SHELL_FAMILY, self.name))
except AttributeError as ex:
raise AnsibleError(f'The {shell._load_name!r} shell plugin does not support become. It is missing the {ex.name!r} attribute.')
exe = getattr(shell, 'executable', None)
if exe and not noexe:
cmd = '%s -c %s' % (exe, cmd)
@ -78,6 +79,25 @@ class BecomeBase(AnsiblePlugin):
self._id = _gen_id()
self.success = 'BECOME-SUCCESS-%s' % self._id
def strip_become_prompt(self, data: bytes) -> bytes:
"""
Strips the first found configured become prompt from `data`, trailing whitespace and anything that precedes the prompt, then returns the result.
If no prompt is expected, or the prompt is not `str` or `bytes`, `data` will be returned as-is.
"""
if not self.prompt or not isinstance(self.prompt, (str, bytes)) or not self.expect_prompt():
return data
return self._strip_through_prefix(self.prompt, data)
def strip_become_success(self, data: bytes) -> bytes:
"""Strips the first found success marker from `data`, trailing whitespace and anything that precedes the success marker, then returns the result."""
return self._strip_through_prefix(self.success, data)
@staticmethod
def _strip_through_prefix(match: str | bytes, data: bytes) -> bytes:
"""Strips the first occurrence of `match` from `data`, trailing whitespace and anything that precedes `match`, then returns the result."""
return re.sub(br'^.*?' + re.escape(to_bytes(match)) + br'\s*', b'', data, count=1, flags=re.DOTALL)
def check_success(self, b_output):
b_success = to_bytes(self.success)
return any(b_success in l.rstrip() for l in b_output.splitlines(True))

@ -93,7 +93,7 @@ DOCUMENTATION = """
import re
import shlex
from ansible.module_utils.common.text.converters import to_bytes
from ansible.module_utils.common.text.converters import to_text
from ansible.plugins.become import BecomeBase
@ -139,15 +139,18 @@ class BecomeModule(BecomeBase):
'口令',
]
def check_password_prompt(self, b_output):
def check_password_prompt(self, b_output: bytes) -> bool:
""" checks if the expected password prompt exists in b_output """
prompts = self.get_option('prompt_l10n') or self.SU_PROMPT_LOCALIZATIONS
b_password_string = b"|".join((br'(\w+\'s )?' + to_bytes(p)) for p in prompts)
password_prompt_strings = "|".join(re.escape(p) for p in prompts)
# Colon or unicode fullwidth colon
b_password_string = b_password_string + to_bytes(u' ?(:|) ?')
b_su_prompt_localizations_re = re.compile(b_password_string, flags=re.IGNORECASE)
return bool(b_su_prompt_localizations_re.match(b_output))
prompt_pattern = rf"(?:{password_prompt_strings})\s*[:]"
match = re.search(prompt_pattern, to_text(b_output), flags=re.IGNORECASE)
if match:
self.prompt = match.group(0) # preserve the actual matched string so we can scrub the output
return bool(match)
def build_become_command(self, cmd, shell):
super(BecomeModule, self).build_become_command(cmd, shell)

@ -72,12 +72,25 @@ DOCUMENTATION = """
ini:
- section: sudo_become_plugin
key: password
sudo_chdir:
description: Directory to change to before invoking sudo; can avoid permission errors when dropping privileges.
type: string
required: False
version_added: '2.19'
vars:
- name: ansible_sudo_chdir
env:
- name: ANSIBLE_SUDO_CHDIR
ini:
- section: sudo_become_plugin
key: chdir
"""
import re
import shlex
from ansible.plugins.become import BecomeBase
from ansible.errors import AnsibleError
class BecomeModule(BecomeBase):
@ -117,4 +130,10 @@ class BecomeModule(BecomeBase):
if user:
user = '-u %s' % (user)
if chdir := self.get_option('sudo_chdir'):
try:
becomecmd = f'{shell.CD} {shlex.quote(chdir)} {shell._SHELL_AND} {becomecmd}'
except AttributeError as ex:
raise AnsibleError(f'The {shell._load_name!r} shell plugin does not support sudo chdir. It is missing the {ex.name!r} attribute.')
return ' '.join([becomecmd, flags, prompt, user, self._build_success_command(cmd, shell)])

@ -11,19 +11,38 @@ DOCUMENTATION = """
- This connection plugin allows ansible to execute tasks on the Ansible 'controller' instead of on a remote host.
author: ansible (@core)
version_added: historical
options:
become_success_timeout:
version_added: '2.19'
type: int
default: 10
description:
- Number of seconds to wait for become to succeed when enabled.
- The default will be used if the configured value is less than 1.
vars:
- name: ansible_local_become_success_timeout
become_strip_preamble:
version_added: '2.19'
type: bool
default: true
description:
- Strip internal become output preceding command execution. Disable for additional diagnostics.
vars:
- name: ansible_local_become_strip_preamble
extends_documentation_fragment:
- connection_pipelining
notes:
- The remote user is ignored, the user with which the ansible CLI was executed is used instead.
"""
import fcntl
import functools
import getpass
import os
import pty
import selectors
import shutil
import subprocess
import time
import typing as t
import ansible.constants as C
@ -86,7 +105,7 @@ class Connection(ConnectionBase):
else:
cmd = map(to_bytes, cmd)
master = None
pty_primary = None
stdin = subprocess.PIPE
if sudoable and self.become and self.become.expect_prompt() and not self.get_option('pipelining'):
# Create a pty if sudoable for privilege escalation that needs it.
@ -94,7 +113,7 @@ class Connection(ConnectionBase):
# cause the command to fail in certain situations where we are escalating
# privileges or the command otherwise needs a pty.
try:
master, stdin = pty.openpty()
pty_primary, stdin = pty.openpty()
except (IOError, OSError) as e:
display.debug("Unable to open pty: %s" % to_native(e))
@ -108,60 +127,134 @@ class Connection(ConnectionBase):
stderr=subprocess.PIPE,
)
# if we created a master, we can close the other half of the pty now, otherwise master is stdin
if master is not None:
# if we created a pty, we can close the other half of the pty now, otherwise primary is stdin
if pty_primary is not None:
os.close(stdin)
display.debug("done running command with Popen()")
if self.become and self.become.expect_prompt() and sudoable:
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK)
fcntl.fcntl(p.stderr, fcntl.F_SETFL, fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK)
selector = selectors.DefaultSelector()
selector.register(p.stdout, selectors.EVENT_READ)
selector.register(p.stderr, selectors.EVENT_READ)
become_output = b''
try:
while not self.become.check_success(become_output) and not self.become.check_password_prompt(become_output):
events = selector.select(self._play_context.timeout)
if not events:
stdout, stderr = p.communicate()
raise AnsibleError('timeout waiting for privilege escalation password prompt:\n' + to_native(become_output))
for key, event in events:
if key.fileobj == p.stdout:
chunk = p.stdout.read()
elif key.fileobj == p.stderr:
chunk = p.stderr.read()
if not chunk:
stdout, stderr = p.communicate()
raise AnsibleError('privilege output closed while waiting for password prompt:\n' + to_native(become_output))
become_output += chunk
finally:
selector.close()
if not self.become.check_success(become_output):
become_pass = self.become.get_option('become_pass', playcontext=self._play_context)
if master is None:
p.stdin.write(to_bytes(become_pass, errors='surrogate_or_strict') + b'\n')
else:
os.write(master, to_bytes(become_pass, errors='surrogate_or_strict') + b'\n')
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) & ~os.O_NONBLOCK)
fcntl.fcntl(p.stderr, fcntl.F_SETFL, fcntl.fcntl(p.stderr, fcntl.F_GETFL) & ~os.O_NONBLOCK)
become_stdout_bytes, become_stderr_bytes = self._ensure_become_success(p, pty_primary, sudoable)
display.debug("getting output with communicate()")
stdout, stderr = p.communicate(in_data)
display.debug("done communicating")
# preserve output from privilege escalation stage as `bytes`; it may contain actual output (eg `raw`) or error messages
stdout = become_stdout_bytes + stdout
stderr = become_stderr_bytes + stderr
# finally, close the other half of the pty, if it was created
if master:
os.close(master)
if pty_primary:
os.close(pty_primary)
display.debug("done with local.exec_command()")
return (p.returncode, stdout, stderr)
return p.returncode, stdout, stderr
def _ensure_become_success(self, p: subprocess.Popen, pty_primary: int, sudoable: bool) -> tuple[bytes, bytes]:
"""
Ensure that become succeeds, returning a tuple containing stdout captured after success and all stderr.
Returns immediately if `self.become` or `sudoable` are False, or `build_become_command` was not called, without performing any additional checks.
"""
if not self.become or not sudoable or not self.become._id: # _id is set by build_become_command, if it was not called, assume no become
return b'', b''
start_seconds = time.monotonic()
become_stdout = bytearray()
become_stderr = bytearray()
last_stdout_prompt_offset = 0
last_stderr_prompt_offset = 0
# map the buffers to their associated stream for the selector reads
become_capture = {
p.stdout: become_stdout,
p.stderr: become_stderr,
}
expect_password_prompt = self.become.expect_prompt()
sent_password = False
def become_error(reason: str) -> t.NoReturn:
error_message = f'{reason} waiting for become success'
if expect_password_prompt and not sent_password:
error_message += ' or become password prompt'
error_message += '.'
if become_stdout:
error_message += f'\n>>> Standard Output\n{to_text(bytes(become_stdout))}'
if become_stderr:
error_message += f'\n>>> Standard Error\n{to_text(bytes(become_stderr))}'
raise AnsibleError(error_message)
os.set_blocking(p.stdout.fileno(), False)
os.set_blocking(p.stderr.fileno(), False)
with selectors.DefaultSelector() as selector:
selector.register(p.stdout, selectors.EVENT_READ, 'stdout')
selector.register(p.stderr, selectors.EVENT_READ, 'stderr')
while not self.become.check_success(become_stdout):
if not selector.get_map(): # we only reach end of stream after all descriptors are EOF
become_error('Premature end of stream')
if expect_password_prompt and (
self.become.check_password_prompt(become_stdout[last_stdout_prompt_offset:]) or
self.become.check_password_prompt(become_stderr[last_stderr_prompt_offset:])
):
if sent_password:
become_error('Duplicate become password prompt encountered')
last_stdout_prompt_offset = len(become_stdout)
last_stderr_prompt_offset = len(become_stderr)
password_to_send = to_bytes(self.become.get_option('become_pass') or '') + b'\n'
if pty_primary is None:
p.stdin.write(password_to_send)
p.stdin.flush()
else:
os.write(pty_primary, password_to_send)
sent_password = True
remaining_timeout_seconds = self._become_success_timeout - (time.monotonic() - start_seconds)
events = selector.select(remaining_timeout_seconds) if remaining_timeout_seconds > 0 else []
if not events:
# ignoring remaining output after timeout to prevent hanging
become_error('Timed out')
# read all content (non-blocking) from streams that signaled available input and append to the associated buffer
for key, event in events:
obj = t.cast(t.BinaryIO, key.fileobj)
if chunk := obj.read():
become_capture[obj] += chunk
else:
selector.unregister(obj) # EOF on this obj, stop polling it
os.set_blocking(p.stdout.fileno(), True)
os.set_blocking(p.stderr.fileno(), True)
become_stdout_bytes = bytes(become_stdout)
become_stderr_bytes = bytes(become_stderr)
if self.get_option('become_strip_preamble'):
become_stdout_bytes = self.become.strip_become_success(self.become.strip_become_prompt(become_stdout_bytes))
become_stderr_bytes = self.become.strip_become_prompt(become_stderr_bytes)
return become_stdout_bytes, become_stderr_bytes
@functools.cached_property
def _become_success_timeout(self) -> int:
"""Timeout value for become success in seconds."""
if (timeout := self.get_option('become_success_timeout')) < 1:
timeout = C.config.get_configuration_definitions('connection', 'local')['become_success_timeout']['default']
return timeout
def put_file(self, in_path: str, out_path: str) -> None:
""" transfer a file from local to local """

@ -29,6 +29,7 @@ class ShellModule(ShellBase):
# commonly used
ECHO = 'echo'
CD = 'cd'
COMMAND_SEP = ';'
# How to end lines in a python script one-liner

@ -1,3 +1,9 @@
destructive
shippable/posix/group3
context/controller
shippable/posix/group1
context/target
gather_facts/no
needs/target/setup_become_user_pair
needs/target/setup_test_user
setup/always/setup_passlib_controller # required for setup_test_user
skip/macos # requires a TTY
skip/freebsd # appears to require a TTY (ignores password input from stdin)

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# A command wrapper that delegates to su after lowering privilege through an intermediate user (via sudo).
# This allows forcing an environment that always requires a password prompt for su.
set -eu
args=("${@}")
for i in "${!args[@]}"; do
case "${args[$i]}" in
"--intermediate-user")
intermediate_user_idx="${i}"
;;
esac
done
intermediate_user_name="${args[intermediate_user_idx+1]}"
unset "args[intermediate_user_idx]"
unset "args[intermediate_user_idx+1]"
exec sudo -n -u "${intermediate_user_name}" su "${args[@]}"

@ -1,6 +0,0 @@
#!/usr/bin/env bash
set -eux
# ensure we execute su with a pseudo terminal
[ "$(ansible -a whoami --become-method=su localhost --become)" != "su: requires a terminal to execute" ]

@ -0,0 +1,63 @@
- name: create unprivileged user pair
include_role:
name: setup_become_user_pair
public: true # this exports target_user_name, target_user_password, intermediate_user_name
vars:
intermediate_user_groups: "{{ 'staff,admin' if ansible_os_family == 'Darwin' else omit }}" # this works, but requires a TTY; disabled MacOS su testing in CI for now via aliases
- name: deploy su shim
copy:
src: sushim.sh
dest: /tmp/sushim.sh
mode: a+rx
- name: ensure su is setuid on Alpine
file:
path: /bin/su
mode: +s
when: ansible_os_family == 'Alpine'
- name: test su scenarios where a password prompt must be encountered
vars:
ansible_become: yes
ansible_become_method: su
ansible_become_exe: /tmp/sushim.sh
ansible_become_flags: --intermediate-user {{ intermediate_user_name | quote }} # the default plugin flags are empty
ansible_become_user: "{{ target_user_name }}"
ansible_become_password: "{{ target_user_password }}"
block:
- name: basic success check
raw: whoami
register: success
# NOTE: The ssh connection plugin does not properly strip noise from raw stdout, unlike the local connection plugin.
# Once that is fixed, this can be changed to a comparison against stdout, not stdout_lines[-1].
failed_when: success.stdout_lines[-1] != target_user_name
- name: validate that a password prompt is being used
vars:
ansible_become_password: BOGUSPASS
raw: exit 99
ignore_errors: yes
register: bogus_password
- assert:
that:
- | # account for different failure behavior between local and ssh
bogus_password.msg is contains "Incorrect su password" or
bogus_password.msg is contains "Premature end of stream waiting for become success." or
(bogus_password.stdout | default('')) is contains "Sorry"
- name: test wrong su prompt expected
raw: echo hi mom from $(whoami)
register: wrong_su_prompt
vars:
ansible_su_prompt_l10n: NOT_A_VALID_PROMPT
ansible_local_become_success_timeout: 3 # actual become success timeout
ansible_ssh_timeout: 3 # connection timeout, which results in an N+2 second select timeout
ignore_errors: yes
- assert:
that:
- wrong_su_prompt is failed
- ansible_connection != "local" or wrong_su_prompt.msg is contains "Timed out waiting for become success or become password prompt"
- ansible_connection != "ssh" or wrong_su_prompt.msg is contains "waiting for privilege escalation prompt"

@ -0,0 +1,7 @@
destructive
shippable/posix/group1
context/target
gather_facts/no
needs/target/setup_become_user_pair
needs/target/setup_test_user
setup/always/setup_passlib_controller # required for setup_test_user

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# A command wrapper that delegates to sudo after lowering privilege through an intermediate user (via sudo).
# This allows forcing an environment that always requires a password prompt for sudo.
set -eu
args=("${@}")
for i in "${!args[@]}"; do
case "${args[$i]}" in
"--intermediate-user")
intermediate_user_idx="${i}"
;;
esac
done
intermediate_user_name="${args[intermediate_user_idx+1]}"
unset "args[intermediate_user_idx]"
unset "args[intermediate_user_idx+1]"
exec sudo -n -u "${intermediate_user_name}" sudo -k "${args[@]}"

@ -0,0 +1,84 @@
- name: create unprivileged become user pair
include_role:
name: setup_become_user_pair
public: true
- name: capture config values
set_fact:
# this needs to be looked up and stored before setting ansible_become_flags
default_become_flags: "{{ lookup('config', 'become_flags', plugin_type='become', plugin_name='sudo') }}"
intermediate_flags: --intermediate-user {{ intermediate_user_name | quote }}
- name: deploy sudo shim
copy:
src: sudoshim.sh
dest: /tmp/sudoshim.sh
mode: a+rx
- name: apply shared become vars to all tasks that use the sudo test shim
vars:
ansible_become: yes
ansible_become_method: sudo
ansible_become_user: '{{ target_user_name }}'
ansible_become_password: '{{ intermediate_user_password }}'
ansible_become_exe: /tmp/sudoshim.sh
ansible_become_flags: '{{ default_become_flags }} {{ intermediate_flags }}'
ansible_local_become_strip_preamble: true
block:
- name: basic success check
raw: whoami
register: success
# NOTE: The ssh connection plugin does not properly strip noise from raw stdout, unlike the local connection plugin.
# Once that is fixed, this can be changed to a comparison against stdout, not stdout_lines[-1].
failed_when: success.stdout_lines[-1] != target_user_name
- name: validate that a password prompt is being used and that the shim is invalidating the sudo timestamp
vars:
ansible_become_password: BOGUSPASS
raw: exit 99
ignore_errors: true
register: bogus_password
- assert:
that:
- bogus_password.msg is contains "Incorrect sudo password" or bogus_password.msg is contains "Duplicate become password prompt encountered"
- name: request sudo chdir to a nonexistent root dir; expected failure
raw: echo himom
vars:
ansible_sudo_chdir: /nonexistent_dir
ignore_errors: true
register: nonexistent_chdir
- assert:
that:
- nonexistent_chdir is failed
# deal with inconsistent failure behavior across different connection plugins
- (nonexistent_chdir.msg ~ (nonexistent_chdir.stdout | default('')) ~ (nonexistent_chdir.stderr | default(''))) is search "cd.*/nonexistent_dir"
- name: request sudo chdir to /; cwd should successfully be / before sudo runs
raw: echo "CWD IS <$(pwd)>"
vars:
ansible_sudo_chdir: /
register: chdir_root
- assert:
that:
- chdir_root.stdout is contains 'CWD IS </>'
- name: become with custom sudo `--` flags (similar to defaults)
vars:
ansible_become_flags: --set-home --stdin --non-interactive {{ intermediate_flags }}
raw: whoami
register: custom_flags
- name: become with no user
vars:
ansible_become_user: ""
raw: whoami
register: no_user
- assert:
that:
- custom_flags.stdout.strip() == test_user_name
- no_user.stdout.strip() == "root"

@ -1,2 +1,6 @@
shippable/posix/group5
needs/target/connection
needs/target/setup_become_user_pair
needs/target/setup_test_user
setup/once/setup_passlib_controller
destructive

@ -0,0 +1,88 @@
#!/usr/bin/env bash
# A wrapper around `sudo` that replaces the expected password prompt string (if given) with a bogus value.
# This allows testing situations where the expected password prompt is not found.
# This wrapper also supports becoming an intermediate user before executing sudo, to support testing as root.
set -eu
args=("${@}")
intermediate_user_idx=''
original_prompt=''
shell_executable=''
shell_command=''
original_prompt_idx=''
# some args show up after others, but we need them before processing args that came before them
for i in "${!args[@]}"; do
case "${args[$i]}" in
"-p")
original_prompt="${args[i+1]}"
original_prompt_idx="${i}"
;;
"-c")
shell_executable="${args[i-1]}"
shell_command="${args[i+1]}"
;;
esac
done
for i in "${!args[@]}"; do
case "${args[$i]}" in
"--inject-stdout-noise")
echo "stdout noise"
unset "args[i]"
;;
"--inject-stderr-noise")
echo >&2 "stderr noise"
unset "args[i]"
;;
"--bogus-prompt")
args[original_prompt_idx+1]="BOGUSPROMPT"
unset "args[i]"
;;
"--intermediate-user")
intermediate_user_idx="${i}"
;;
"--close-stderr")
>&2 echo "some injected stderr, EOF now"
exec 2>&- # close stderr, doesn't seem to work on Ubuntu 24.04 (either not closed or not seen in Python?)
unset "args[i]"
;;
"--sleep-before-sudo")
sleep 3
unset "args[i]"
;;
"--pretend-to-be-broken-passwordless-sudo")
echo '{"hello":"not a module response"}'
exit 0
;;
"--pretend-to-be-broken-sudo")
echo -n "${original_prompt}"
read -rs
echo
echo "success, but not invoking given command"
exit 0
;;
"--pretend-to-be-sudo")
echo -n "${original_prompt}"
read -rs
echo
echo "success, invoking given command"
"${shell_executable}" -c "${shell_command}"
exit 0
;;
esac
done
if [[ "${intermediate_user_idx}" ]]; then
# The current user can sudo without a password prompt, so delegate to an intermediate user first.
intermediate_user_name="${args[intermediate_user_idx+1]}"
unset "args[intermediate_user_idx]"
unset "args[intermediate_user_idx+1]"
exec sudo -n -u "${intermediate_user_name}" sudo -k "${args[@]}"
else
# The current user requires a password to sudo, so sudo can be used directly.
exec sudo -k "${args[@]}"
fi

@ -4,7 +4,7 @@ set -eux
group=local
cd ../connection
pushd ../connection
INVENTORY="../connection_${group}/test_connection.inventory" ./test.sh \
-e target_hosts="${group}" \
@ -19,3 +19,7 @@ ANSIBLE_CONNECTION_PLUGINS="../connection_${group}/connection_plugins" INVENTORY
-e local_tmp=/tmp/ansible-local \
-e remote_tmp=/tmp/ansible-remote \
"$@"
popd
ANSIBLE_ROLES_PATH=../ ansible-playbook -i ../../inventory test_become_password_handling.yml "$@"

@ -0,0 +1,199 @@
- hosts: testhost
gather_facts: no
tasks:
- name: skip this test for non-root users
block:
- debug:
msg: Skipping sudo test for non-root user.
- meta: end_play
when: lookup('pipe', 'whoami') != 'root'
- name: attempt root-to-root sudo become (no-op, should succeed)
vars:
ansible_become: yes
ansible_become_user: root
ansible_become_method: sudo
raw: whoami
register: root_noop_sudo
- assert:
that:
- root_noop_sudo.stdout is contains "root"
- name: create an unprivileged become user pair
include_role:
name: setup_become_user_pair
public: true
- name: deploy sudo shim
copy:
src: sudoshim.sh
dest: /tmp/sudoshim.sh
mode: u+x
- set_fact:
default_sudo_flags: "{{ lookup('config', 'become_flags', plugin_type='become', plugin_name='sudo') }}"
- name: apply shared become vars to all tasks that use the sudo test shim
vars:
ansible_become: yes
ansible_become_method: sudo
ansible_become_user: '{{ target_user_name }}'
ansible_become_password: '{{ target_user_password }}'
ansible_become_exe: /tmp/sudoshim.sh
ansible_local_become_strip_preamble: true
intermediate: "{{ ((' --intermediate-user ' + intermediate_user_name) if intermediate_user_name is defined else '') }}"
block:
- name: verify stdout/stderr noise is present with preamble strip disabled
raw: echo $(whoami) ran
vars:
ansible_become_flags: "{{ default_sudo_flags ~ intermediate ~ ' --inject-stdout-noise --inject-stderr-noise' }}"
ansible_become_password: "{{ intermediate_user_password }}"
ansible_local_become_strip_preamble: false
ansible_pipelining: true
register: preamble_visible
- name: verify stdout/stderr noise is stripped with preamble strip enabled
raw: echo $(whoami) ran
vars:
ansible_become_flags: "{{ default_sudo_flags ~ intermediate ~ ' --inject-stdout-noise --inject-stderr-noise' }}"
ansible_become_password: "{{ intermediate_user_password }}"
ansible_pipelining: true
register: preamble_stripped
- assert:
that:
- preamble_visible.stdout is contains(target_user_name ~ " ran")
- preamble_stripped.stdout is contains(target_user_name ~ " ran")
- preamble_visible.stdout is contains "stdout noise"
- preamble_stripped.stdout is not contains "stdout noise"
- preamble_visible.stderr is contains "stderr noise"
- preamble_stripped.stderr is not contains "stderr noise"
- name: verify sudo succeeds with a password (no PTY/pipelined)
raw: echo $(whoami) ran
vars:
ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}"
ansible_become_password: "{{ intermediate_user_password }}"
ansible_local_become_strip_preamble: false # allow prompt sniffing from the output
ansible_pipelining: true
register: success_pipelined
- assert:
that:
- success_pipelined.stdout is contains(target_user_name ~ " ran")
- success_pipelined.stderr is search 'sudo via ansible.*password\:'
- name: verify sudo works with a PTY allocated (pipelining disabled)
raw: echo $(whoami) ran without pipelining
vars:
ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}"
ansible_become_password: "{{ intermediate_user_password }}"
ansible_local_become_strip_preamble: false
ansible_pipelining: no # a PTY is allocated by the local plugin only when pipelining is disabled
register: pty_non_pipelined
- assert:
that:
- pty_non_pipelined.stdout is contains(test_user_name ~ " ran without pipelining")
- name: verify early-closed stderr still sees success
# this test triggers early EOF (which unregisters the associated selector) on most OSs, but not on Ubuntu 24.04
vars:
ansible_become_flags: --close-stderr --sleep-before-sudo --pretend-to-be-sudo
ansible_local_become_success_timeout: 5
raw: echo ran_ok
register: stderr_closed
- assert:
that:
- stderr_closed.stderr is contains "some injected stderr, EOF now"
- stderr_closed.stdout is contains "ran_ok"
- name: verify timeout handling by setting a sudo prompt that won't trigger password send
vars:
ansible_become_flags: "{{ default_sudo_flags ~ intermediate }} --bogus-prompt"
ansible_local_become_success_timeout: 2
raw: exit 99
ignore_errors: true
register: prompt_timeout
- assert:
that:
- prompt_timeout is failed
- prompt_timeout.msg is contains "Timed out waiting for become success or become password prompt"
- prompt_timeout.msg is contains "BOGUSPROMPT"
- prompt_timeout.rc is not defined
- name: verify sub 1s timeout is always increased
vars:
ansible_become_flags: "{{ default_sudo_flags ~ ' --sleep-before-sudo' }}"
ansible_local_become_success_timeout: 0 # a 0s timeout would always cause select to be skipped in the current impl, but added a 2s sleep in the shim in case that changes
raw: whoami
register: timeout_increased
- assert:
that:
- timeout_increased.stdout is contains target_user_name
- name: verify handling of premature exit/stream closure
vars:
ansible_become_exe: /bogus
raw: exit 99
ignore_errors: true
register: early_close
- assert:
that:
- early_close is failed
- early_close.msg is contains "Premature end of stream"
- name: verify lack of required password fails as expected
raw: exit 99
vars:
ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}"
ansible_become_password: ~
ignore_errors: true
register: missing_required_password
- assert:
that:
- missing_required_password is failed
- missing_required_password.msg is contains "password is required"
- name: verify duplicate password prompts are handled (due to incorrect password)
raw: echo hi mom
vars:
ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}"
ansible_become_password: not_the_correct_password
ignore_errors: yes
register: incorrect_password
- assert:
that:
- incorrect_password is failed
- incorrect_password.msg is contains "Duplicate become password prompt encountered"
- name: no error, but no become success message
vars:
ansible_become_flags: --pretend-to-be-broken-sudo # handle password prompt, but return no output
raw: exit 99 # should never actually run anyway
ignore_errors: true
register: no_become_success
- assert:
that:
- no_become_success is failed
- no_become_success.msg is contains "Premature end of stream waiting for become success"
- name: test broken passwordless sudo
raw: echo hi mom
vars:
ansible_become_flags: --pretend-to-be-broken-passwordless-sudo
ignore_errors: yes
register: broken_passwordless_sudo
- assert:
that:
- broken_passwordless_sudo is failed
- broken_passwordless_sudo.msg is contains "not a module response"

@ -4,6 +4,7 @@
- name: make sure the test user is available
include_role:
name: setup_test_user
public: yes
- name: verify AnsibleModule works when cwd is missing
test_cwd_missing:

@ -0,0 +1,2 @@
target_user_name: ansibletest0 # target unprivileged user
intermediate_user_name: ansibletest1 # an intermediate user

@ -0,0 +1,24 @@
- name: create an unprivileged user on target
include_role:
name: setup_test_user
public: true
vars:
test_user_name: '{{ target_user_name }}'
test_user_groups: '{{ target_user_groups | default(omit) }}'
- name: capture target user password
set_fact:
target_user_password: '{{ test_user_plaintext_password }}'
- name: create an intermediate user on target with password-required sudo ability
include_role:
name: setup_test_user
public: true
vars:
test_user_name: "{{ intermediate_user_name }}"
test_user_groups: '{{ intermediate_user_groups | default(omit) }}'
test_user_allow_sudo: true
- name: capture config values, intermediate user password from role
set_fact:
intermediate_user_password: "{{ test_user_plaintext_password }}"

@ -0,0 +1,5 @@
# true/false/nopasswd
test_user_allow_sudo: false
test_user_name: ansibletest0
test_user_group: ~
test_user_groups: ~

@ -1,6 +1,7 @@
- name: delete test user
user:
name: "{{ test_user_name }}"
name: "{{ item }}"
state: absent
remove: yes
force: yes
loop: "{{ delete_users }}"

@ -1,8 +1,3 @@
- name: set variables
set_fact:
test_user_name: ansibletest0
test_user_group: null
- name: set plaintext password
no_log: yes
set_fact:

@ -1,6 +1,5 @@
- name: set variables
set_fact:
test_user_name: ansibletest0
test_user_group: staff
- name: set plaintext password

@ -13,15 +13,23 @@
paths:
- tasks
- set_fact:
delete_users: "{{ (delete_users | default([])) + [test_user_name] }}"
- name: create test user
user:
name: "{{ test_user_name }}"
group: "{{ test_user_group or omit }}"
groups: "{{ test_user_groups or omit }}"
password: "{{ test_user_hashed_password or omit }}"
register: test_user
notify:
- delete test user
- name: maybe configure sudo
include_tasks: sudo_config.yml
when: test_user_allow_sudo != False
- name: run whoami as the test user
shell: whoami
vars:

@ -0,0 +1,14 @@
- name: probe for sudoers config path
shell: visudo -c
ignore_errors: true
register: visudo_result
- set_fact:
sudoers_path: '{{ ((visudo_result.stdout ~ visudo_result.stderr) | regex_search("(/.*sudoers).*:", "\1"))[0] }}'
- name: allow the user to use sudo {{"with no password" if test_user_allow_sudo == "nopasswd" else "with a password"}}
copy:
content: |
{{ test_user_name }} ALL=(ALL) {{"NOPASSWD: " if test_user_allow_sudo == "nopasswd" else ""}} ALL
mode: '0440'
dest: '{{ sudoers_path ~ ".d/" ~ test_user_name }}'

@ -2,6 +2,7 @@
from __future__ import annotations
import shutil
import sys
import typing as t
from .config import (
@ -13,6 +14,11 @@ from .util import (
exclude_none_values,
)
from .host_configs import (
ControllerConfig,
PosixRemoteConfig,
)
from .host_profiles import (
ControllerHostProfile,
ControllerProfile,
@ -30,12 +36,37 @@ from .ssh import (
)
def get_common_variables(target_profile: HostProfile, controller: bool = False) -> dict[str, t.Any]:
"""Get variables common to all scenarios, but dependent on the target profile."""
target_config = target_profile.config
if controller or isinstance(target_config, ControllerConfig):
# The current process is running on the controller, so consult the controller directly when it is the target.
macos = sys.platform == 'darwin'
elif isinstance(target_config, PosixRemoteConfig):
# The target is not the controller, so consult the remote config for that target.
macos = target_config.name.startswith('macos/')
else:
# The target is a type which either cannot be macOS or for which the OS is unknown.
# There is currently no means for the user to override this for user provided hosts.
macos = False
common_variables: dict[str, t.Any] = {}
if macos:
# When using sudo on macOS we may encounter permission denied errors when dropping privileges due to inability to access the current working directory.
# To compensate for this we'll perform a `cd /` before running any commands after `sudo` succeeds.
common_variables.update(ansible_sudo_chdir='/')
return common_variables
def create_controller_inventory(args: EnvironmentConfig, path: str, controller_host: ControllerHostProfile) -> None:
"""Create and return inventory for use in controller-only integration tests."""
inventory = Inventory(
host_groups=dict(
testgroup=dict(
testhost=dict(
testhost=get_common_variables(controller_host, controller=True) | dict(
ansible_connection='local',
ansible_pipelining='yes',
ansible_python_interpreter=controller_host.python.path,
@ -129,7 +160,7 @@ def create_posix_inventory(args: EnvironmentConfig, path: str, target_hosts: lis
inventory = Inventory(
host_groups=dict(
testgroup=dict(
testhost=dict(
testhost=get_common_variables(target_host) | dict(
ansible_connection='local',
ansible_pipelining='yes',
ansible_python_interpreter=target_host.python.path,
@ -145,7 +176,7 @@ def create_posix_inventory(args: EnvironmentConfig, path: str, target_hosts: lis
ssh = connections[0]
testhost: dict[str, t.Optional[t.Union[str, int]]] = dict(
testhost: dict[str, t.Optional[t.Union[str, int]]] = get_common_variables(target_host) | dict(
ansible_connection='ssh',
ansible_pipelining='yes',
ansible_python_interpreter=ssh.settings.python_interpreter,

@ -229,6 +229,7 @@ prefer-binary = yes
# enable sudo without a password for the wheel group, allowing ansible to use the sudo become plugin
echo '%wheel ALL=(ALL:ALL) NOPASSWD: ALL' > /usr/local/etc/sudoers.d/ansible-test
chmod 440 /usr/local/etc/sudoers.d/ansible-test
}
bootstrap_remote_macos()

@ -7,6 +7,8 @@ from __future__ import annotations
import re
import pytest
from ansible import context
from ansible.plugins.loader import become_loader, shell_loader
@ -26,3 +28,46 @@ def test_su(mocker, parser, reset_cli_args):
cmd = su.build_become_command('/bin/foo', sh)
assert re.match(r"""su\s+foo -c '/bin/bash -c '"'"'echo BECOME-SUCCESS-.+?; /bin/foo'"'"''""", cmd)
def test_no_cmd() -> None:
cmd = ''
assert become_loader.get('su').build_become_command(cmd, shell_loader.get('sh')) is cmd
@pytest.mark.parametrize("prefix, prompt", (
("", "Password:"),
(" ", "Password :"),
("\n", "Password "),
("x", "Password "),
("", "口令:"),
(" ", "口令 :"),
("\n", "口令 "),
("x", "口令 "),
))
def test_check_password_prompt_success(prefix: str, prompt: str) -> None:
become = become_loader.get('su')
assert become.check_password_prompt((prefix + prompt).encode()) is True
assert become.prompt == prompt
@pytest.mark.parametrize("data", (
"Password",
"Passwort",
"Pass:",
))
def test_check_password_prompt_failure(data: str) -> None:
become = become_loader.get('su')
assert become.check_password_prompt(data.encode()) is False
assert become.prompt == ''
def test_check_password_prompt_escaping(mocker) -> None:
become = become_loader.get('su')
mocker.patch.object(become, 'get_option', return_value=['(invalid regex'])
assert become.check_password_prompt('(invalid regex:') is True

@ -5,9 +5,13 @@
from __future__ import annotations
import pytest
import re
from pytest_mock import MockerFixture
from ansible import context
from ansible.errors import AnsibleError
from ansible.plugins.loader import become_loader, shell_loader
@ -63,3 +67,37 @@ def test_sudo(mocker, parser, reset_cli_args):
cmd = sudo.build_become_command('/bin/foo', sh)
assert re.match(r"""sudo\s+-C5\s-s\s-H\s+-p "\[sudo via ansible, key=.+?\] password:" -u foo /bin/bash -c 'echo BECOME-SUCCESS-.+? ; /bin/foo'""", cmd), cmd
@pytest.mark.parametrize("del_attr_name, expected_error_pattern", (
("ECHO", ".*does not support become.*missing the 'ECHO' attribute"), # BecomeBase
("CD", ".*does not support sudo chdir.*missing the 'CD' attribute"), # sudo
))
def test_invalid_shell_plugin(del_attr_name: str, expected_error_pattern: str, mocker: MockerFixture) -> None:
def badprop(_self):
raise AttributeError(del_attr_name)
sh = shell_loader.get('sh')
mocker.patch.object(type(sh), del_attr_name, property(fget=badprop))
sudo = become_loader.get('sudo')
sudo.set_options(direct=dict(sudo_chdir='/'))
with pytest.raises(AnsibleError, match=expected_error_pattern):
sudo.build_become_command('/stuff', sh)
def test_no_flags() -> None:
sudo = become_loader.get('sudo')
sudo.set_options(direct=dict(become_pass='x', become_flags=''))
result = sudo.build_become_command('/stuff', shell_loader.get('sh'))
# ensure no flags were in the final command other than -p and -u
assert re.search(r'''^sudo +-p "[^"]*" -u root '[^']*'$''', result)
def test_no_cmd() -> None:
cmd = ''
assert become_loader.get('sudo').build_become_command(cmd, shell_loader.get('sh')) is cmd

Loading…
Cancel
Save