diff --git a/changelogs/fragments/local-become-fixes.yml b/changelogs/fragments/local-become-fixes.yml new file mode 100644 index 00000000000..a4fd90d5062 --- /dev/null +++ b/changelogs/fragments/local-become-fixes.yml @@ -0,0 +1,22 @@ +minor_changes: + - local connection plugin - A new ``become_success_timeout`` operation-wide timeout config (default 10s) was added for ``become``. + - local connection plugin - A new ``become_strip_preamble`` config option (default True) was added; disable to preserve diagnostic ``become`` output in task results. + - local connection plugin - When a ``become`` plugin's ``prompt`` value is a non-string after the ``check_password_prompt`` callback has completed, no prompt stripping will occur on stderr. + +bugfixes: + - local connection plugin - Fixed silent ignore of ``become`` failures and loss of task output when data arrived concurrently on stdout and stderr during ``become`` operation validation. + - local connection plugin - Fixed hang or spurious failure when data arrived concurrently on stdout and stderr during a successful ``become`` operation validation. + - local connection plugin - Fixed task output header truncation when post-become data arrived before ``become`` operation validation had completed. + - local connection plugin - Ensure ``become`` success validation always occurs, even when an active plugin does not set ``prompt``. + - local connection plugin - Fixed cases where the internal ``BECOME-SUCCESS`` message appeared in task output. + - local connection plugin - Fixed long timeout/hang for ``become`` plugins that repeat their prompt on failure (e.g., ``sudo``, some ``su`` implementations). + - local connection plugin - Fixed hang when an active become plugin incorrectly signals lack of prompt. + - local connection plugin - Fixed hang when a become plugin expects a prompt but a password was not provided. + - local connection plugin - Fixed hang when an internal become read timeout expired before the password prompt was written. + - local connection plugin - Fixed hang when only one of stdout or stderr was closed by the ``become_exe`` subprocess. + - local connection plugin - Become timeout errors now include all received data. Previously, the most recently-received data was discarded. + - sudo become plugin - The `sudo_chdir` config option allows the current directory to be set to the specified value before executing sudo to avoid permission errors when dropping privileges. + - su become plugin - Ensure generated regex from ``prompt_l10n`` config values is properly escaped. + - su become plugin - Ensure that password prompts are correctly detected in the presence of leading output. Previously, this case resulted in a timeout or hang. + - su become plugin - Ensure that trailing colon is expected on all ``prompt_l10n`` config values. + - ansible-test - Managed macOS instances now use the ``sudo_chdir`` option for the ``sudo`` become plugin to avoid permission errors when dropping privileges. diff --git a/lib/ansible/plugins/become/__init__.py b/lib/ansible/plugins/become/__init__.py index 6f7a2b88abf..a7e35b5bf3c 100644 --- a/lib/ansible/plugins/become/__init__.py +++ b/lib/ansible/plugins/become/__init__.py @@ -3,6 +3,7 @@ # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import annotations +import re import shlex from abc import abstractmethod @@ -13,6 +14,7 @@ from gettext import dgettext from ansible.errors import AnsibleError from ansible.module_utils.common.text.converters import to_bytes from ansible.plugins import AnsiblePlugin +from ansible.utils import display as _display def _gen_id(length=32): @@ -53,11 +55,11 @@ class BecomeBase(AnsiblePlugin): return getattr(playcontext, option, None) - def expect_prompt(self): + def expect_prompt(self) -> bool: """This function assists connection plugins in determining if they need to wait for a prompt. Both a prompt and a password are required. """ - return self.prompt and self.get_option('become_pass') + return bool(self.prompt and self.get_option('become_pass')) def _build_success_command(self, cmd, shell, noexe=False): if not all((cmd, shell, self.success)): @@ -65,9 +67,8 @@ class BecomeBase(AnsiblePlugin): try: cmd = shlex.quote('%s %s %s %s' % (shell.ECHO, self.success, shell.COMMAND_SEP, cmd)) - except AttributeError: - # TODO: This should probably become some more robust functionality used to detect incompat - raise AnsibleError('The %s shell family is incompatible with the %s become plugin' % (shell.SHELL_FAMILY, self.name)) + except AttributeError as ex: + raise AnsibleError(f'The {shell._load_name!r} shell plugin does not support become. It is missing the {ex.name!r} attribute.') exe = getattr(shell, 'executable', None) if exe and not noexe: cmd = '%s -c %s' % (exe, cmd) @@ -78,6 +79,25 @@ class BecomeBase(AnsiblePlugin): self._id = _gen_id() self.success = 'BECOME-SUCCESS-%s' % self._id + def strip_become_prompt(self, data: bytes) -> bytes: + """ + Strips the first found configured become prompt from `data`, trailing whitespace and anything that precedes the prompt, then returns the result. + If no prompt is expected, or the prompt is not `str` or `bytes`, `data` will be returned as-is. + """ + if not self.prompt or not isinstance(self.prompt, (str, bytes)) or not self.expect_prompt(): + return data + + return self._strip_through_prefix(self.prompt, data) + + def strip_become_success(self, data: bytes) -> bytes: + """Strips the first found success marker from `data`, trailing whitespace and anything that precedes the success marker, then returns the result.""" + return self._strip_through_prefix(self.success, data) + + @staticmethod + def _strip_through_prefix(match: str | bytes, data: bytes) -> bytes: + """Strips the first occurrence of `match` from `data`, trailing whitespace and anything that precedes `match`, then returns the result.""" + return re.sub(br'^.*?' + re.escape(to_bytes(match)) + br'\s*', b'', data, count=1, flags=re.DOTALL) + def check_success(self, b_output): b_success = to_bytes(self.success) return any(b_success in l.rstrip() for l in b_output.splitlines(True)) diff --git a/lib/ansible/plugins/become/su.py b/lib/ansible/plugins/become/su.py index b8a7f0be993..fc5446b1099 100644 --- a/lib/ansible/plugins/become/su.py +++ b/lib/ansible/plugins/become/su.py @@ -93,7 +93,7 @@ DOCUMENTATION = """ import re import shlex -from ansible.module_utils.common.text.converters import to_bytes +from ansible.module_utils.common.text.converters import to_text from ansible.plugins.become import BecomeBase @@ -139,15 +139,18 @@ class BecomeModule(BecomeBase): '口令', ] - def check_password_prompt(self, b_output): + def check_password_prompt(self, b_output: bytes) -> bool: """ checks if the expected password prompt exists in b_output """ - prompts = self.get_option('prompt_l10n') or self.SU_PROMPT_LOCALIZATIONS - b_password_string = b"|".join((br'(\w+\'s )?' + to_bytes(p)) for p in prompts) + password_prompt_strings = "|".join(re.escape(p) for p in prompts) # Colon or unicode fullwidth colon - b_password_string = b_password_string + to_bytes(u' ?(:|:) ?') - b_su_prompt_localizations_re = re.compile(b_password_string, flags=re.IGNORECASE) - return bool(b_su_prompt_localizations_re.match(b_output)) + prompt_pattern = rf"(?:{password_prompt_strings})\s*[::]" + match = re.search(prompt_pattern, to_text(b_output), flags=re.IGNORECASE) + + if match: + self.prompt = match.group(0) # preserve the actual matched string so we can scrub the output + + return bool(match) def build_become_command(self, cmd, shell): super(BecomeModule, self).build_become_command(cmd, shell) diff --git a/lib/ansible/plugins/become/sudo.py b/lib/ansible/plugins/become/sudo.py index 6a33c987c04..13a86607503 100644 --- a/lib/ansible/plugins/become/sudo.py +++ b/lib/ansible/plugins/become/sudo.py @@ -72,12 +72,25 @@ DOCUMENTATION = """ ini: - section: sudo_become_plugin key: password + sudo_chdir: + description: Directory to change to before invoking sudo; can avoid permission errors when dropping privileges. + type: string + required: False + version_added: '2.19' + vars: + - name: ansible_sudo_chdir + env: + - name: ANSIBLE_SUDO_CHDIR + ini: + - section: sudo_become_plugin + key: chdir """ import re import shlex from ansible.plugins.become import BecomeBase +from ansible.errors import AnsibleError class BecomeModule(BecomeBase): @@ -117,4 +130,10 @@ class BecomeModule(BecomeBase): if user: user = '-u %s' % (user) + if chdir := self.get_option('sudo_chdir'): + try: + becomecmd = f'{shell.CD} {shlex.quote(chdir)} {shell._SHELL_AND} {becomecmd}' + except AttributeError as ex: + raise AnsibleError(f'The {shell._load_name!r} shell plugin does not support sudo chdir. It is missing the {ex.name!r} attribute.') + return ' '.join([becomecmd, flags, prompt, user, self._build_success_command(cmd, shell)]) diff --git a/lib/ansible/plugins/connection/local.py b/lib/ansible/plugins/connection/local.py index d77b37a43bf..6fae6aa5c15 100644 --- a/lib/ansible/plugins/connection/local.py +++ b/lib/ansible/plugins/connection/local.py @@ -11,19 +11,38 @@ DOCUMENTATION = """ - This connection plugin allows ansible to execute tasks on the Ansible 'controller' instead of on a remote host. author: ansible (@core) version_added: historical + options: + become_success_timeout: + version_added: '2.19' + type: int + default: 10 + description: + - Number of seconds to wait for become to succeed when enabled. + - The default will be used if the configured value is less than 1. + vars: + - name: ansible_local_become_success_timeout + become_strip_preamble: + version_added: '2.19' + type: bool + default: true + description: + - Strip internal become output preceding command execution. Disable for additional diagnostics. + vars: + - name: ansible_local_become_strip_preamble extends_documentation_fragment: - connection_pipelining notes: - The remote user is ignored, the user with which the ansible CLI was executed is used instead. """ -import fcntl +import functools import getpass import os import pty import selectors import shutil import subprocess +import time import typing as t import ansible.constants as C @@ -86,7 +105,7 @@ class Connection(ConnectionBase): else: cmd = map(to_bytes, cmd) - master = None + pty_primary = None stdin = subprocess.PIPE if sudoable and self.become and self.become.expect_prompt() and not self.get_option('pipelining'): # Create a pty if sudoable for privilege escalation that needs it. @@ -94,7 +113,7 @@ class Connection(ConnectionBase): # cause the command to fail in certain situations where we are escalating # privileges or the command otherwise needs a pty. try: - master, stdin = pty.openpty() + pty_primary, stdin = pty.openpty() except (IOError, OSError) as e: display.debug("Unable to open pty: %s" % to_native(e)) @@ -108,60 +127,134 @@ class Connection(ConnectionBase): stderr=subprocess.PIPE, ) - # if we created a master, we can close the other half of the pty now, otherwise master is stdin - if master is not None: + # if we created a pty, we can close the other half of the pty now, otherwise primary is stdin + if pty_primary is not None: os.close(stdin) display.debug("done running command with Popen()") - if self.become and self.become.expect_prompt() and sudoable: - fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK) - fcntl.fcntl(p.stderr, fcntl.F_SETFL, fcntl.fcntl(p.stderr, fcntl.F_GETFL) | os.O_NONBLOCK) - selector = selectors.DefaultSelector() - selector.register(p.stdout, selectors.EVENT_READ) - selector.register(p.stderr, selectors.EVENT_READ) - - become_output = b'' - try: - while not self.become.check_success(become_output) and not self.become.check_password_prompt(become_output): - events = selector.select(self._play_context.timeout) - if not events: - stdout, stderr = p.communicate() - raise AnsibleError('timeout waiting for privilege escalation password prompt:\n' + to_native(become_output)) - - for key, event in events: - if key.fileobj == p.stdout: - chunk = p.stdout.read() - elif key.fileobj == p.stderr: - chunk = p.stderr.read() - - if not chunk: - stdout, stderr = p.communicate() - raise AnsibleError('privilege output closed while waiting for password prompt:\n' + to_native(become_output)) - become_output += chunk - finally: - selector.close() - - if not self.become.check_success(become_output): - become_pass = self.become.get_option('become_pass', playcontext=self._play_context) - if master is None: - p.stdin.write(to_bytes(become_pass, errors='surrogate_or_strict') + b'\n') - else: - os.write(master, to_bytes(become_pass, errors='surrogate_or_strict') + b'\n') - - fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) & ~os.O_NONBLOCK) - fcntl.fcntl(p.stderr, fcntl.F_SETFL, fcntl.fcntl(p.stderr, fcntl.F_GETFL) & ~os.O_NONBLOCK) + become_stdout_bytes, become_stderr_bytes = self._ensure_become_success(p, pty_primary, sudoable) display.debug("getting output with communicate()") stdout, stderr = p.communicate(in_data) display.debug("done communicating") + # preserve output from privilege escalation stage as `bytes`; it may contain actual output (eg `raw`) or error messages + stdout = become_stdout_bytes + stdout + stderr = become_stderr_bytes + stderr + # finally, close the other half of the pty, if it was created - if master: - os.close(master) + if pty_primary: + os.close(pty_primary) display.debug("done with local.exec_command()") - return (p.returncode, stdout, stderr) + return p.returncode, stdout, stderr + + def _ensure_become_success(self, p: subprocess.Popen, pty_primary: int, sudoable: bool) -> tuple[bytes, bytes]: + """ + Ensure that become succeeds, returning a tuple containing stdout captured after success and all stderr. + Returns immediately if `self.become` or `sudoable` are False, or `build_become_command` was not called, without performing any additional checks. + """ + if not self.become or not sudoable or not self.become._id: # _id is set by build_become_command, if it was not called, assume no become + return b'', b'' + + start_seconds = time.monotonic() + become_stdout = bytearray() + become_stderr = bytearray() + last_stdout_prompt_offset = 0 + last_stderr_prompt_offset = 0 + + # map the buffers to their associated stream for the selector reads + become_capture = { + p.stdout: become_stdout, + p.stderr: become_stderr, + } + + expect_password_prompt = self.become.expect_prompt() + sent_password = False + + def become_error(reason: str) -> t.NoReturn: + error_message = f'{reason} waiting for become success' + + if expect_password_prompt and not sent_password: + error_message += ' or become password prompt' + + error_message += '.' + + if become_stdout: + error_message += f'\n>>> Standard Output\n{to_text(bytes(become_stdout))}' + + if become_stderr: + error_message += f'\n>>> Standard Error\n{to_text(bytes(become_stderr))}' + + raise AnsibleError(error_message) + + os.set_blocking(p.stdout.fileno(), False) + os.set_blocking(p.stderr.fileno(), False) + + with selectors.DefaultSelector() as selector: + selector.register(p.stdout, selectors.EVENT_READ, 'stdout') + selector.register(p.stderr, selectors.EVENT_READ, 'stderr') + + while not self.become.check_success(become_stdout): + if not selector.get_map(): # we only reach end of stream after all descriptors are EOF + become_error('Premature end of stream') + + if expect_password_prompt and ( + self.become.check_password_prompt(become_stdout[last_stdout_prompt_offset:]) or + self.become.check_password_prompt(become_stderr[last_stderr_prompt_offset:]) + ): + if sent_password: + become_error('Duplicate become password prompt encountered') + + last_stdout_prompt_offset = len(become_stdout) + last_stderr_prompt_offset = len(become_stderr) + + password_to_send = to_bytes(self.become.get_option('become_pass') or '') + b'\n' + + if pty_primary is None: + p.stdin.write(password_to_send) + p.stdin.flush() + else: + os.write(pty_primary, password_to_send) + + sent_password = True + + remaining_timeout_seconds = self._become_success_timeout - (time.monotonic() - start_seconds) + events = selector.select(remaining_timeout_seconds) if remaining_timeout_seconds > 0 else [] + + if not events: + # ignoring remaining output after timeout to prevent hanging + become_error('Timed out') + + # read all content (non-blocking) from streams that signaled available input and append to the associated buffer + for key, event in events: + obj = t.cast(t.BinaryIO, key.fileobj) + + if chunk := obj.read(): + become_capture[obj] += chunk + else: + selector.unregister(obj) # EOF on this obj, stop polling it + + os.set_blocking(p.stdout.fileno(), True) + os.set_blocking(p.stderr.fileno(), True) + + become_stdout_bytes = bytes(become_stdout) + become_stderr_bytes = bytes(become_stderr) + + if self.get_option('become_strip_preamble'): + become_stdout_bytes = self.become.strip_become_success(self.become.strip_become_prompt(become_stdout_bytes)) + become_stderr_bytes = self.become.strip_become_prompt(become_stderr_bytes) + + return become_stdout_bytes, become_stderr_bytes + + @functools.cached_property + def _become_success_timeout(self) -> int: + """Timeout value for become success in seconds.""" + if (timeout := self.get_option('become_success_timeout')) < 1: + timeout = C.config.get_configuration_definitions('connection', 'local')['become_success_timeout']['default'] + + return timeout def put_file(self, in_path: str, out_path: str) -> None: """ transfer a file from local to local """ diff --git a/lib/ansible/plugins/shell/sh.py b/lib/ansible/plugins/shell/sh.py index fc143fd7aab..5b215d4d897 100644 --- a/lib/ansible/plugins/shell/sh.py +++ b/lib/ansible/plugins/shell/sh.py @@ -29,6 +29,7 @@ class ShellModule(ShellBase): # commonly used ECHO = 'echo' + CD = 'cd' COMMAND_SEP = ';' # How to end lines in a python script one-liner diff --git a/test/integration/targets/become_su/aliases b/test/integration/targets/become_su/aliases index 04089be6cfb..db83eef7a3c 100644 --- a/test/integration/targets/become_su/aliases +++ b/test/integration/targets/become_su/aliases @@ -1,3 +1,9 @@ destructive -shippable/posix/group3 -context/controller +shippable/posix/group1 +context/target +gather_facts/no +needs/target/setup_become_user_pair +needs/target/setup_test_user +setup/always/setup_passlib_controller # required for setup_test_user +skip/macos # requires a TTY +skip/freebsd # appears to require a TTY (ignores password input from stdin) diff --git a/test/integration/targets/become_su/files/sushim.sh b/test/integration/targets/become_su/files/sushim.sh new file mode 100755 index 00000000000..47cbcc97998 --- /dev/null +++ b/test/integration/targets/become_su/files/sushim.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# A command wrapper that delegates to su after lowering privilege through an intermediate user (via sudo). +# This allows forcing an environment that always requires a password prompt for su. + +set -eu + +args=("${@}") + +for i in "${!args[@]}"; do + case "${args[$i]}" in + "--intermediate-user") + intermediate_user_idx="${i}" + ;; + esac +done + +intermediate_user_name="${args[intermediate_user_idx+1]}" + +unset "args[intermediate_user_idx]" +unset "args[intermediate_user_idx+1]" + +exec sudo -n -u "${intermediate_user_name}" su "${args[@]}" diff --git a/test/integration/targets/become_su/runme.sh b/test/integration/targets/become_su/runme.sh deleted file mode 100755 index 87a3511f655..00000000000 --- a/test/integration/targets/become_su/runme.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env bash - -set -eux - -# ensure we execute su with a pseudo terminal -[ "$(ansible -a whoami --become-method=su localhost --become)" != "su: requires a terminal to execute" ] diff --git a/test/integration/targets/become_su/tasks/main.yml b/test/integration/targets/become_su/tasks/main.yml new file mode 100644 index 00000000000..8c13f877429 --- /dev/null +++ b/test/integration/targets/become_su/tasks/main.yml @@ -0,0 +1,63 @@ +- name: create unprivileged user pair + include_role: + name: setup_become_user_pair + public: true # this exports target_user_name, target_user_password, intermediate_user_name + vars: + intermediate_user_groups: "{{ 'staff,admin' if ansible_os_family == 'Darwin' else omit }}" # this works, but requires a TTY; disabled MacOS su testing in CI for now via aliases + +- name: deploy su shim + copy: + src: sushim.sh + dest: /tmp/sushim.sh + mode: a+rx + +- name: ensure su is setuid on Alpine + file: + path: /bin/su + mode: +s + when: ansible_os_family == 'Alpine' + +- name: test su scenarios where a password prompt must be encountered + vars: + ansible_become: yes + ansible_become_method: su + ansible_become_exe: /tmp/sushim.sh + ansible_become_flags: --intermediate-user {{ intermediate_user_name | quote }} # the default plugin flags are empty + ansible_become_user: "{{ target_user_name }}" + ansible_become_password: "{{ target_user_password }}" + block: + - name: basic success check + raw: whoami + register: success + # NOTE: The ssh connection plugin does not properly strip noise from raw stdout, unlike the local connection plugin. + # Once that is fixed, this can be changed to a comparison against stdout, not stdout_lines[-1]. + failed_when: success.stdout_lines[-1] != target_user_name + + - name: validate that a password prompt is being used + vars: + ansible_become_password: BOGUSPASS + raw: exit 99 + ignore_errors: yes + register: bogus_password + + - assert: + that: + - | # account for different failure behavior between local and ssh + bogus_password.msg is contains "Incorrect su password" or + bogus_password.msg is contains "Premature end of stream waiting for become success." or + (bogus_password.stdout | default('')) is contains "Sorry" + + - name: test wrong su prompt expected + raw: echo hi mom from $(whoami) + register: wrong_su_prompt + vars: + ansible_su_prompt_l10n: NOT_A_VALID_PROMPT + ansible_local_become_success_timeout: 3 # actual become success timeout + ansible_ssh_timeout: 3 # connection timeout, which results in an N+2 second select timeout + ignore_errors: yes + + - assert: + that: + - wrong_su_prompt is failed + - ansible_connection != "local" or wrong_su_prompt.msg is contains "Timed out waiting for become success or become password prompt" + - ansible_connection != "ssh" or wrong_su_prompt.msg is contains "waiting for privilege escalation prompt" diff --git a/test/integration/targets/become_sudo/aliases b/test/integration/targets/become_sudo/aliases new file mode 100644 index 00000000000..e7a37d7ea85 --- /dev/null +++ b/test/integration/targets/become_sudo/aliases @@ -0,0 +1,7 @@ +destructive +shippable/posix/group1 +context/target +gather_facts/no +needs/target/setup_become_user_pair +needs/target/setup_test_user +setup/always/setup_passlib_controller # required for setup_test_user diff --git a/test/integration/targets/become_sudo/files/sudoshim.sh b/test/integration/targets/become_sudo/files/sudoshim.sh new file mode 100755 index 00000000000..ab76174853d --- /dev/null +++ b/test/integration/targets/become_sudo/files/sudoshim.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# A command wrapper that delegates to sudo after lowering privilege through an intermediate user (via sudo). +# This allows forcing an environment that always requires a password prompt for sudo. + +set -eu + +args=("${@}") + +for i in "${!args[@]}"; do + case "${args[$i]}" in + "--intermediate-user") + intermediate_user_idx="${i}" + ;; + esac +done + +intermediate_user_name="${args[intermediate_user_idx+1]}" + +unset "args[intermediate_user_idx]" +unset "args[intermediate_user_idx+1]" + +exec sudo -n -u "${intermediate_user_name}" sudo -k "${args[@]}" diff --git a/test/integration/targets/become_sudo/tasks/main.yml b/test/integration/targets/become_sudo/tasks/main.yml new file mode 100644 index 00000000000..39527fd0ee4 --- /dev/null +++ b/test/integration/targets/become_sudo/tasks/main.yml @@ -0,0 +1,84 @@ +- name: create unprivileged become user pair + include_role: + name: setup_become_user_pair + public: true + +- name: capture config values + set_fact: + # this needs to be looked up and stored before setting ansible_become_flags + default_become_flags: "{{ lookup('config', 'become_flags', plugin_type='become', plugin_name='sudo') }}" + intermediate_flags: --intermediate-user {{ intermediate_user_name | quote }} + +- name: deploy sudo shim + copy: + src: sudoshim.sh + dest: /tmp/sudoshim.sh + mode: a+rx + +- name: apply shared become vars to all tasks that use the sudo test shim + vars: + ansible_become: yes + ansible_become_method: sudo + ansible_become_user: '{{ target_user_name }}' + ansible_become_password: '{{ intermediate_user_password }}' + ansible_become_exe: /tmp/sudoshim.sh + ansible_become_flags: '{{ default_become_flags }} {{ intermediate_flags }}' + ansible_local_become_strip_preamble: true + block: + - name: basic success check + raw: whoami + register: success + # NOTE: The ssh connection plugin does not properly strip noise from raw stdout, unlike the local connection plugin. + # Once that is fixed, this can be changed to a comparison against stdout, not stdout_lines[-1]. + failed_when: success.stdout_lines[-1] != target_user_name + + - name: validate that a password prompt is being used and that the shim is invalidating the sudo timestamp + vars: + ansible_become_password: BOGUSPASS + raw: exit 99 + ignore_errors: true + register: bogus_password + + - assert: + that: + - bogus_password.msg is contains "Incorrect sudo password" or bogus_password.msg is contains "Duplicate become password prompt encountered" + + - name: request sudo chdir to a nonexistent root dir; expected failure + raw: echo himom + vars: + ansible_sudo_chdir: /nonexistent_dir + ignore_errors: true + register: nonexistent_chdir + + - assert: + that: + - nonexistent_chdir is failed + # deal with inconsistent failure behavior across different connection plugins + - (nonexistent_chdir.msg ~ (nonexistent_chdir.stdout | default('')) ~ (nonexistent_chdir.stderr | default(''))) is search "cd.*/nonexistent_dir" + + - name: request sudo chdir to /; cwd should successfully be / before sudo runs + raw: echo "CWD IS <$(pwd)>" + vars: + ansible_sudo_chdir: / + register: chdir_root + + - assert: + that: + - chdir_root.stdout is contains 'CWD IS ' + + - name: become with custom sudo `--` flags (similar to defaults) + vars: + ansible_become_flags: --set-home --stdin --non-interactive {{ intermediate_flags }} + raw: whoami + register: custom_flags + + - name: become with no user + vars: + ansible_become_user: "" + raw: whoami + register: no_user + + - assert: + that: + - custom_flags.stdout.strip() == test_user_name + - no_user.stdout.strip() == "root" diff --git a/test/integration/targets/connection_local/aliases b/test/integration/targets/connection_local/aliases index 9390a2b3489..ee249138e4c 100644 --- a/test/integration/targets/connection_local/aliases +++ b/test/integration/targets/connection_local/aliases @@ -1,2 +1,6 @@ shippable/posix/group5 needs/target/connection +needs/target/setup_become_user_pair +needs/target/setup_test_user +setup/once/setup_passlib_controller +destructive diff --git a/test/integration/targets/connection_local/files/sudoshim.sh b/test/integration/targets/connection_local/files/sudoshim.sh new file mode 100755 index 00000000000..ec1b1f6cc0b --- /dev/null +++ b/test/integration/targets/connection_local/files/sudoshim.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +# A wrapper around `sudo` that replaces the expected password prompt string (if given) with a bogus value. +# This allows testing situations where the expected password prompt is not found. +# This wrapper also supports becoming an intermediate user before executing sudo, to support testing as root. + +set -eu + +args=("${@}") +intermediate_user_idx='' +original_prompt='' +shell_executable='' +shell_command='' +original_prompt_idx='' + +# some args show up after others, but we need them before processing args that came before them +for i in "${!args[@]}"; do + case "${args[$i]}" in + "-p") + original_prompt="${args[i+1]}" + original_prompt_idx="${i}" + ;; + "-c") + shell_executable="${args[i-1]}" + shell_command="${args[i+1]}" + ;; + esac +done + +for i in "${!args[@]}"; do + case "${args[$i]}" in + "--inject-stdout-noise") + echo "stdout noise" + unset "args[i]" + ;; + "--inject-stderr-noise") + echo >&2 "stderr noise" + unset "args[i]" + ;; + "--bogus-prompt") + args[original_prompt_idx+1]="BOGUSPROMPT" + unset "args[i]" + ;; + "--intermediate-user") + intermediate_user_idx="${i}" + ;; + "--close-stderr") + >&2 echo "some injected stderr, EOF now" + exec 2>&- # close stderr, doesn't seem to work on Ubuntu 24.04 (either not closed or not seen in Python?) + unset "args[i]" + ;; + "--sleep-before-sudo") + sleep 3 + unset "args[i]" + ;; + "--pretend-to-be-broken-passwordless-sudo") + echo '{"hello":"not a module response"}' + exit 0 + ;; + "--pretend-to-be-broken-sudo") + echo -n "${original_prompt}" + read -rs + echo + echo "success, but not invoking given command" + exit 0 + ;; + "--pretend-to-be-sudo") + echo -n "${original_prompt}" + read -rs + echo + echo "success, invoking given command" + "${shell_executable}" -c "${shell_command}" + exit 0 + ;; + esac +done + +if [[ "${intermediate_user_idx}" ]]; then + # The current user can sudo without a password prompt, so delegate to an intermediate user first. + intermediate_user_name="${args[intermediate_user_idx+1]}" + + unset "args[intermediate_user_idx]" + unset "args[intermediate_user_idx+1]" + + exec sudo -n -u "${intermediate_user_name}" sudo -k "${args[@]}" +else + # The current user requires a password to sudo, so sudo can be used directly. + exec sudo -k "${args[@]}" +fi diff --git a/test/integration/targets/connection_local/runme.sh b/test/integration/targets/connection_local/runme.sh index 42b2b827200..5c625031fc7 100755 --- a/test/integration/targets/connection_local/runme.sh +++ b/test/integration/targets/connection_local/runme.sh @@ -4,7 +4,7 @@ set -eux group=local -cd ../connection +pushd ../connection INVENTORY="../connection_${group}/test_connection.inventory" ./test.sh \ -e target_hosts="${group}" \ @@ -19,3 +19,7 @@ ANSIBLE_CONNECTION_PLUGINS="../connection_${group}/connection_plugins" INVENTORY -e local_tmp=/tmp/ansible-local \ -e remote_tmp=/tmp/ansible-remote \ "$@" + +popd + +ANSIBLE_ROLES_PATH=../ ansible-playbook -i ../../inventory test_become_password_handling.yml "$@" diff --git a/test/integration/targets/connection_local/test_become_password_handling.yml b/test/integration/targets/connection_local/test_become_password_handling.yml new file mode 100644 index 00000000000..38060ef7fa0 --- /dev/null +++ b/test/integration/targets/connection_local/test_become_password_handling.yml @@ -0,0 +1,199 @@ +- hosts: testhost + gather_facts: no + tasks: + - name: skip this test for non-root users + block: + - debug: + msg: Skipping sudo test for non-root user. + - meta: end_play + when: lookup('pipe', 'whoami') != 'root' + + - name: attempt root-to-root sudo become (no-op, should succeed) + vars: + ansible_become: yes + ansible_become_user: root + ansible_become_method: sudo + raw: whoami + register: root_noop_sudo + + - assert: + that: + - root_noop_sudo.stdout is contains "root" + + - name: create an unprivileged become user pair + include_role: + name: setup_become_user_pair + public: true + + - name: deploy sudo shim + copy: + src: sudoshim.sh + dest: /tmp/sudoshim.sh + mode: u+x + + - set_fact: + default_sudo_flags: "{{ lookup('config', 'become_flags', plugin_type='become', plugin_name='sudo') }}" + + - name: apply shared become vars to all tasks that use the sudo test shim + vars: + ansible_become: yes + ansible_become_method: sudo + ansible_become_user: '{{ target_user_name }}' + ansible_become_password: '{{ target_user_password }}' + ansible_become_exe: /tmp/sudoshim.sh + ansible_local_become_strip_preamble: true + intermediate: "{{ ((' --intermediate-user ' + intermediate_user_name) if intermediate_user_name is defined else '') }}" + block: + - name: verify stdout/stderr noise is present with preamble strip disabled + raw: echo $(whoami) ran + vars: + ansible_become_flags: "{{ default_sudo_flags ~ intermediate ~ ' --inject-stdout-noise --inject-stderr-noise' }}" + ansible_become_password: "{{ intermediate_user_password }}" + ansible_local_become_strip_preamble: false + ansible_pipelining: true + register: preamble_visible + + - name: verify stdout/stderr noise is stripped with preamble strip enabled + raw: echo $(whoami) ran + vars: + ansible_become_flags: "{{ default_sudo_flags ~ intermediate ~ ' --inject-stdout-noise --inject-stderr-noise' }}" + ansible_become_password: "{{ intermediate_user_password }}" + ansible_pipelining: true + register: preamble_stripped + + - assert: + that: + - preamble_visible.stdout is contains(target_user_name ~ " ran") + - preamble_stripped.stdout is contains(target_user_name ~ " ran") + - preamble_visible.stdout is contains "stdout noise" + - preamble_stripped.stdout is not contains "stdout noise" + - preamble_visible.stderr is contains "stderr noise" + - preamble_stripped.stderr is not contains "stderr noise" + + - name: verify sudo succeeds with a password (no PTY/pipelined) + raw: echo $(whoami) ran + vars: + ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}" + ansible_become_password: "{{ intermediate_user_password }}" + ansible_local_become_strip_preamble: false # allow prompt sniffing from the output + ansible_pipelining: true + register: success_pipelined + + - assert: + that: + - success_pipelined.stdout is contains(target_user_name ~ " ran") + - success_pipelined.stderr is search 'sudo via ansible.*password\:' + + - name: verify sudo works with a PTY allocated (pipelining disabled) + raw: echo $(whoami) ran without pipelining + vars: + ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}" + ansible_become_password: "{{ intermediate_user_password }}" + ansible_local_become_strip_preamble: false + ansible_pipelining: no # a PTY is allocated by the local plugin only when pipelining is disabled + register: pty_non_pipelined + + - assert: + that: + - pty_non_pipelined.stdout is contains(test_user_name ~ " ran without pipelining") + + - name: verify early-closed stderr still sees success + # this test triggers early EOF (which unregisters the associated selector) on most OSs, but not on Ubuntu 24.04 + vars: + ansible_become_flags: --close-stderr --sleep-before-sudo --pretend-to-be-sudo + ansible_local_become_success_timeout: 5 + raw: echo ran_ok + register: stderr_closed + + - assert: + that: + - stderr_closed.stderr is contains "some injected stderr, EOF now" + - stderr_closed.stdout is contains "ran_ok" + + - name: verify timeout handling by setting a sudo prompt that won't trigger password send + vars: + ansible_become_flags: "{{ default_sudo_flags ~ intermediate }} --bogus-prompt" + ansible_local_become_success_timeout: 2 + raw: exit 99 + ignore_errors: true + register: prompt_timeout + + - assert: + that: + - prompt_timeout is failed + - prompt_timeout.msg is contains "Timed out waiting for become success or become password prompt" + - prompt_timeout.msg is contains "BOGUSPROMPT" + - prompt_timeout.rc is not defined + + - name: verify sub 1s timeout is always increased + vars: + ansible_become_flags: "{{ default_sudo_flags ~ ' --sleep-before-sudo' }}" + ansible_local_become_success_timeout: 0 # a 0s timeout would always cause select to be skipped in the current impl, but added a 2s sleep in the shim in case that changes + raw: whoami + register: timeout_increased + + - assert: + that: + - timeout_increased.stdout is contains target_user_name + + - name: verify handling of premature exit/stream closure + vars: + ansible_become_exe: /bogus + raw: exit 99 + ignore_errors: true + register: early_close + + - assert: + that: + - early_close is failed + - early_close.msg is contains "Premature end of stream" + + - name: verify lack of required password fails as expected + raw: exit 99 + vars: + ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}" + ansible_become_password: ~ + ignore_errors: true + register: missing_required_password + + - assert: + that: + - missing_required_password is failed + - missing_required_password.msg is contains "password is required" + + - name: verify duplicate password prompts are handled (due to incorrect password) + raw: echo hi mom + vars: + ansible_become_flags: "{{ default_sudo_flags ~ intermediate }}" + ansible_become_password: not_the_correct_password + ignore_errors: yes + register: incorrect_password + + - assert: + that: + - incorrect_password is failed + - incorrect_password.msg is contains "Duplicate become password prompt encountered" + + - name: no error, but no become success message + vars: + ansible_become_flags: --pretend-to-be-broken-sudo # handle password prompt, but return no output + raw: exit 99 # should never actually run anyway + ignore_errors: true + register: no_become_success + + - assert: + that: + - no_become_success is failed + - no_become_success.msg is contains "Premature end of stream waiting for become success" + + - name: test broken passwordless sudo + raw: echo hi mom + vars: + ansible_become_flags: --pretend-to-be-broken-passwordless-sudo + ignore_errors: yes + register: broken_passwordless_sudo + + - assert: + that: + - broken_passwordless_sudo is failed + - broken_passwordless_sudo.msg is contains "not a module response" diff --git a/test/integration/targets/module_utils/module_utils_basic_setcwd.yml b/test/integration/targets/module_utils/module_utils_basic_setcwd.yml index 71317f9c29d..a4b406eb00b 100644 --- a/test/integration/targets/module_utils/module_utils_basic_setcwd.yml +++ b/test/integration/targets/module_utils/module_utils_basic_setcwd.yml @@ -4,6 +4,7 @@ - name: make sure the test user is available include_role: name: setup_test_user + public: yes - name: verify AnsibleModule works when cwd is missing test_cwd_missing: diff --git a/test/integration/targets/setup_become_user_pair/defaults/main.yml b/test/integration/targets/setup_become_user_pair/defaults/main.yml new file mode 100644 index 00000000000..3d8042db418 --- /dev/null +++ b/test/integration/targets/setup_become_user_pair/defaults/main.yml @@ -0,0 +1,2 @@ +target_user_name: ansibletest0 # target unprivileged user +intermediate_user_name: ansibletest1 # an intermediate user diff --git a/test/integration/targets/setup_become_user_pair/tasks/main.yml b/test/integration/targets/setup_become_user_pair/tasks/main.yml new file mode 100644 index 00000000000..737c05ef260 --- /dev/null +++ b/test/integration/targets/setup_become_user_pair/tasks/main.yml @@ -0,0 +1,24 @@ +- name: create an unprivileged user on target + include_role: + name: setup_test_user + public: true + vars: + test_user_name: '{{ target_user_name }}' + test_user_groups: '{{ target_user_groups | default(omit) }}' + +- name: capture target user password + set_fact: + target_user_password: '{{ test_user_plaintext_password }}' + +- name: create an intermediate user on target with password-required sudo ability + include_role: + name: setup_test_user + public: true + vars: + test_user_name: "{{ intermediate_user_name }}" + test_user_groups: '{{ intermediate_user_groups | default(omit) }}' + test_user_allow_sudo: true + +- name: capture config values, intermediate user password from role + set_fact: + intermediate_user_password: "{{ test_user_plaintext_password }}" diff --git a/test/integration/targets/setup_test_user/defaults/main.yml b/test/integration/targets/setup_test_user/defaults/main.yml new file mode 100644 index 00000000000..14a0a891ce6 --- /dev/null +++ b/test/integration/targets/setup_test_user/defaults/main.yml @@ -0,0 +1,5 @@ +# true/false/nopasswd +test_user_allow_sudo: false +test_user_name: ansibletest0 +test_user_group: ~ +test_user_groups: ~ diff --git a/test/integration/targets/setup_test_user/handlers/main.yml b/test/integration/targets/setup_test_user/handlers/main.yml index dec4bd75357..de4e0c18939 100644 --- a/test/integration/targets/setup_test_user/handlers/main.yml +++ b/test/integration/targets/setup_test_user/handlers/main.yml @@ -1,6 +1,7 @@ - name: delete test user user: - name: "{{ test_user_name }}" + name: "{{ item }}" state: absent remove: yes force: yes + loop: "{{ delete_users }}" diff --git a/test/integration/targets/setup_test_user/tasks/default.yml b/test/integration/targets/setup_test_user/tasks/default.yml index 83ee8f1e69d..144afcf490f 100644 --- a/test/integration/targets/setup_test_user/tasks/default.yml +++ b/test/integration/targets/setup_test_user/tasks/default.yml @@ -1,8 +1,3 @@ -- name: set variables - set_fact: - test_user_name: ansibletest0 - test_user_group: null - - name: set plaintext password no_log: yes set_fact: diff --git a/test/integration/targets/setup_test_user/tasks/macosx.yml b/test/integration/targets/setup_test_user/tasks/macosx.yml index d33ab04e50d..f9d3c15005b 100644 --- a/test/integration/targets/setup_test_user/tasks/macosx.yml +++ b/test/integration/targets/setup_test_user/tasks/macosx.yml @@ -1,6 +1,5 @@ - name: set variables set_fact: - test_user_name: ansibletest0 test_user_group: staff - name: set plaintext password diff --git a/test/integration/targets/setup_test_user/tasks/main.yml b/test/integration/targets/setup_test_user/tasks/main.yml index 5adfb13d6dd..c3e3161a640 100644 --- a/test/integration/targets/setup_test_user/tasks/main.yml +++ b/test/integration/targets/setup_test_user/tasks/main.yml @@ -13,15 +13,23 @@ paths: - tasks +- set_fact: + delete_users: "{{ (delete_users | default([])) + [test_user_name] }}" + - name: create test user user: name: "{{ test_user_name }}" group: "{{ test_user_group or omit }}" + groups: "{{ test_user_groups or omit }}" password: "{{ test_user_hashed_password or omit }}" register: test_user notify: - delete test user +- name: maybe configure sudo + include_tasks: sudo_config.yml + when: test_user_allow_sudo != False + - name: run whoami as the test user shell: whoami vars: diff --git a/test/integration/targets/setup_test_user/tasks/sudo_config.yml b/test/integration/targets/setup_test_user/tasks/sudo_config.yml new file mode 100644 index 00000000000..c86607e11bb --- /dev/null +++ b/test/integration/targets/setup_test_user/tasks/sudo_config.yml @@ -0,0 +1,14 @@ +- name: probe for sudoers config path + shell: visudo -c + ignore_errors: true + register: visudo_result + +- set_fact: + sudoers_path: '{{ ((visudo_result.stdout ~ visudo_result.stderr) | regex_search("(/.*sudoers).*:", "\1"))[0] }}' + +- name: allow the user to use sudo {{"with no password" if test_user_allow_sudo == "nopasswd" else "with a password"}} + copy: + content: | + {{ test_user_name }} ALL=(ALL) {{"NOPASSWD: " if test_user_allow_sudo == "nopasswd" else ""}} ALL + mode: '0440' + dest: '{{ sudoers_path ~ ".d/" ~ test_user_name }}' diff --git a/test/lib/ansible_test/_internal/inventory.py b/test/lib/ansible_test/_internal/inventory.py index 6abf9ede962..098d0d0b43f 100644 --- a/test/lib/ansible_test/_internal/inventory.py +++ b/test/lib/ansible_test/_internal/inventory.py @@ -2,6 +2,7 @@ from __future__ import annotations import shutil +import sys import typing as t from .config import ( @@ -13,6 +14,11 @@ from .util import ( exclude_none_values, ) +from .host_configs import ( + ControllerConfig, + PosixRemoteConfig, +) + from .host_profiles import ( ControllerHostProfile, ControllerProfile, @@ -30,12 +36,37 @@ from .ssh import ( ) +def get_common_variables(target_profile: HostProfile, controller: bool = False) -> dict[str, t.Any]: + """Get variables common to all scenarios, but dependent on the target profile.""" + target_config = target_profile.config + + if controller or isinstance(target_config, ControllerConfig): + # The current process is running on the controller, so consult the controller directly when it is the target. + macos = sys.platform == 'darwin' + elif isinstance(target_config, PosixRemoteConfig): + # The target is not the controller, so consult the remote config for that target. + macos = target_config.name.startswith('macos/') + else: + # The target is a type which either cannot be macOS or for which the OS is unknown. + # There is currently no means for the user to override this for user provided hosts. + macos = False + + common_variables: dict[str, t.Any] = {} + + if macos: + # When using sudo on macOS we may encounter permission denied errors when dropping privileges due to inability to access the current working directory. + # To compensate for this we'll perform a `cd /` before running any commands after `sudo` succeeds. + common_variables.update(ansible_sudo_chdir='/') + + return common_variables + + def create_controller_inventory(args: EnvironmentConfig, path: str, controller_host: ControllerHostProfile) -> None: """Create and return inventory for use in controller-only integration tests.""" inventory = Inventory( host_groups=dict( testgroup=dict( - testhost=dict( + testhost=get_common_variables(controller_host, controller=True) | dict( ansible_connection='local', ansible_pipelining='yes', ansible_python_interpreter=controller_host.python.path, @@ -129,7 +160,7 @@ def create_posix_inventory(args: EnvironmentConfig, path: str, target_hosts: lis inventory = Inventory( host_groups=dict( testgroup=dict( - testhost=dict( + testhost=get_common_variables(target_host) | dict( ansible_connection='local', ansible_pipelining='yes', ansible_python_interpreter=target_host.python.path, @@ -145,7 +176,7 @@ def create_posix_inventory(args: EnvironmentConfig, path: str, target_hosts: lis ssh = connections[0] - testhost: dict[str, t.Optional[t.Union[str, int]]] = dict( + testhost: dict[str, t.Optional[t.Union[str, int]]] = get_common_variables(target_host) | dict( ansible_connection='ssh', ansible_pipelining='yes', ansible_python_interpreter=ssh.settings.python_interpreter, diff --git a/test/lib/ansible_test/_util/target/setup/bootstrap.sh b/test/lib/ansible_test/_util/target/setup/bootstrap.sh index ec2acd3dccb..e429369760a 100644 --- a/test/lib/ansible_test/_util/target/setup/bootstrap.sh +++ b/test/lib/ansible_test/_util/target/setup/bootstrap.sh @@ -229,6 +229,7 @@ prefer-binary = yes # enable sudo without a password for the wheel group, allowing ansible to use the sudo become plugin echo '%wheel ALL=(ALL:ALL) NOPASSWD: ALL' > /usr/local/etc/sudoers.d/ansible-test + chmod 440 /usr/local/etc/sudoers.d/ansible-test } bootstrap_remote_macos() diff --git a/test/units/plugins/become/test_su.py b/test/units/plugins/become/test_su.py index a6906375bd0..9088657b934 100644 --- a/test/units/plugins/become/test_su.py +++ b/test/units/plugins/become/test_su.py @@ -7,6 +7,8 @@ from __future__ import annotations import re +import pytest + from ansible import context from ansible.plugins.loader import become_loader, shell_loader @@ -26,3 +28,46 @@ def test_su(mocker, parser, reset_cli_args): cmd = su.build_become_command('/bin/foo', sh) assert re.match(r"""su\s+foo -c '/bin/bash -c '"'"'echo BECOME-SUCCESS-.+?; /bin/foo'"'"''""", cmd) + + +def test_no_cmd() -> None: + cmd = '' + + assert become_loader.get('su').build_become_command(cmd, shell_loader.get('sh')) is cmd + + +@pytest.mark.parametrize("prefix, prompt", ( + ("", "Password:"), + (" ", "Password :"), + ("\n", "Password :"), + ("x", "Password :"), + ("", "口令:"), + (" ", "口令 :"), + ("\n", "口令 :"), + ("x", "口令 :"), +)) +def test_check_password_prompt_success(prefix: str, prompt: str) -> None: + become = become_loader.get('su') + + assert become.check_password_prompt((prefix + prompt).encode()) is True + assert become.prompt == prompt + + +@pytest.mark.parametrize("data", ( + "Password", + "Passwort", + "Pass:", +)) +def test_check_password_prompt_failure(data: str) -> None: + become = become_loader.get('su') + + assert become.check_password_prompt(data.encode()) is False + assert become.prompt == '' + + +def test_check_password_prompt_escaping(mocker) -> None: + become = become_loader.get('su') + + mocker.patch.object(become, 'get_option', return_value=['(invalid regex']) + + assert become.check_password_prompt('(invalid regex:') is True diff --git a/test/units/plugins/become/test_sudo.py b/test/units/plugins/become/test_sudo.py index 6b7ca137142..14381363d97 100644 --- a/test/units/plugins/become/test_sudo.py +++ b/test/units/plugins/become/test_sudo.py @@ -5,9 +5,13 @@ from __future__ import annotations +import pytest import re +from pytest_mock import MockerFixture + from ansible import context +from ansible.errors import AnsibleError from ansible.plugins.loader import become_loader, shell_loader @@ -63,3 +67,37 @@ def test_sudo(mocker, parser, reset_cli_args): cmd = sudo.build_become_command('/bin/foo', sh) assert re.match(r"""sudo\s+-C5\s-s\s-H\s+-p "\[sudo via ansible, key=.+?\] password:" -u foo /bin/bash -c 'echo BECOME-SUCCESS-.+? ; /bin/foo'""", cmd), cmd + + +@pytest.mark.parametrize("del_attr_name, expected_error_pattern", ( + ("ECHO", ".*does not support become.*missing the 'ECHO' attribute"), # BecomeBase + ("CD", ".*does not support sudo chdir.*missing the 'CD' attribute"), # sudo +)) +def test_invalid_shell_plugin(del_attr_name: str, expected_error_pattern: str, mocker: MockerFixture) -> None: + def badprop(_self): + raise AttributeError(del_attr_name) + + sh = shell_loader.get('sh') + mocker.patch.object(type(sh), del_attr_name, property(fget=badprop)) + + sudo = become_loader.get('sudo') + sudo.set_options(direct=dict(sudo_chdir='/')) + + with pytest.raises(AnsibleError, match=expected_error_pattern): + sudo.build_become_command('/stuff', sh) + + +def test_no_flags() -> None: + sudo = become_loader.get('sudo') + sudo.set_options(direct=dict(become_pass='x', become_flags='')) + + result = sudo.build_become_command('/stuff', shell_loader.get('sh')) + + # ensure no flags were in the final command other than -p and -u + assert re.search(r'''^sudo +-p "[^"]*" -u root '[^']*'$''', result) + + +def test_no_cmd() -> None: + cmd = '' + + assert become_loader.get('sudo').build_become_command(cmd, shell_loader.get('sh')) is cmd