Add SSH_ASKPASS as an alternative means to provide ssh with passwords

pull/83936/head
Matt Martz 3 months ago
parent a0495fc314
commit a05a86c977
No known key found for this signature in database
GPG Key ID: 40832D88E9FC91D8

@ -0,0 +1,25 @@
# Copyright: Contributors to the Ansible project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import os
import sys
import typing as t
from multiprocessing.shared_memory import SharedMemory
def main() -> None:
kwargs: dict[str, t.Any] = {}
if sys.version_info[:2] >= (3, 13):
# deprecated: description='unneeded due to track argument for SharedMemory' python_version='3.13'
kwargs['track'] = False
try:
shm = SharedMemory(name=os.environ['_ANSIBLE_SSH_ASKPASS_SHM'], **kwargs)
except FileNotFoundError:
# We must be running after the ansible fork is shutting down
sys.exit(1)
sys.stdout.buffer.write(shm.buf.tobytes().rstrip(b'\x00'))
sys.stdout.flush()
shm.buf[:] = b'\x00' * shm.size
shm.close()
sys.exit(0)

@ -6,8 +6,12 @@
from __future__ import annotations from __future__ import annotations
import os
import sys
# ansible.cli needs to be imported first, to ensure the source bin/* scripts run that code first # ansible.cli needs to be imported first, to ensure the source bin/* scripts run that code first
from ansible.cli import CLI from ansible.cli import CLI
from ansible.cli import _ssh_askpass
from ansible import constants as C from ansible import constants as C
from ansible import context from ansible import context
from ansible.cli.arguments import option_helpers as opt_help from ansible.cli.arguments import option_helpers as opt_help
@ -20,6 +24,13 @@ from ansible.playbook import Playbook
from ansible.playbook.play import Play from ansible.playbook.play import Play
from ansible.utils.display import Display from ansible.utils.display import Display
# as the SSH_ASKPASS script can only be invoked as a singular command
# python -m doesn't work here, so assume that if we are executing
# and there is only a single item in argv plus the executable, and the
# env var is set we are in SSH_ASKPASS mode
if sys.argv[2:] == [] and os.getenv('_ANSIBLE_SSH_ASKPASS_SHM'):
_ssh_askpass.main()
display = Display() display = Display()

@ -62,6 +62,21 @@ DOCUMENTATION = """
- name: ansible_password - name: ansible_password
- name: ansible_ssh_pass - name: ansible_ssh_pass
- name: ansible_ssh_password - name: ansible_ssh_password
password_mechanism:
description: Mechanism to use for handling ssh password prompt
type: string
default: ssh_askpass
choices:
- ssh_askpass
- sshpass
- disable
version_added: '2.19'
env:
- name: ANSIBLE_SSH_PASSWORD_MECHANISM
ini:
- {key: password_mechanism, section: ssh_connection}
vars:
- name: ansible_ssh_password_mechanism
sshpass_prompt: sshpass_prompt:
description: description:
- Password prompt that sshpass should search for. Supported by sshpass 1.06 and up. - Password prompt that sshpass should search for. Supported by sshpass 1.06 and up.
@ -357,7 +372,6 @@ DOCUMENTATION = """
type: string type: string
description: description:
- "PKCS11 SmartCard provider such as opensc, example: /usr/local/lib/opensc-pkcs11.so" - "PKCS11 SmartCard provider such as opensc, example: /usr/local/lib/opensc-pkcs11.so"
- Requires sshpass version 1.06+, sshpass must support the -P option.
env: [{name: ANSIBLE_PKCS11_PROVIDER}] env: [{name: ANSIBLE_PKCS11_PROVIDER}]
ini: ini:
- {key: pkcs11_provider, section: ssh_connection} - {key: pkcs11_provider, section: ssh_connection}
@ -367,26 +381,31 @@ DOCUMENTATION = """
import collections.abc as c import collections.abc as c
import errno import errno
import contextlib
import fcntl import fcntl
import hashlib import hashlib
import io import io
import os import os
import pathlib
import pty import pty
import re import re
import selectors import selectors
import shlex import shlex
import shutil
import subprocess import subprocess
import sys
import time import time
import typing as t import typing as t
from functools import wraps from functools import wraps
from multiprocessing.shared_memory import SharedMemory
from ansible.errors import ( from ansible.errors import (
AnsibleAuthenticationFailure, AnsibleAuthenticationFailure,
AnsibleConnectionFailure, AnsibleConnectionFailure,
AnsibleError, AnsibleError,
AnsibleFileNotFound, AnsibleFileNotFound,
) )
from ansible.module_utils.six import PY3, text_type, binary_type from ansible.module_utils.six import text_type, binary_type
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.plugins.connection import ConnectionBase, BUFSIZE from ansible.plugins.connection import ConnectionBase, BUFSIZE
from ansible.plugins.shell.powershell import _parse_clixml from ansible.plugins.shell.powershell import _parse_clixml
@ -408,6 +427,8 @@ b_NOT_SSH_ERRORS = (b'Traceback (most recent call last):', # Python-2.6 when th
SSHPASS_AVAILABLE = None SSHPASS_AVAILABLE = None
SSH_DEBUG = re.compile(r'^debug\d+: .*') SSH_DEBUG = re.compile(r'^debug\d+: .*')
_HAS_RESOURCE_TRACK = sys.version_info[:2] >= (3, 13)
class AnsibleControlPersistBrokenPipeError(AnsibleError): class AnsibleControlPersistBrokenPipeError(AnsibleError):
""" ControlPersist broken pipe """ """ ControlPersist broken pipe """
@ -496,9 +517,10 @@ def _ssh_retry(
remaining_tries = int(self.get_option('reconnection_retries')) + 1 remaining_tries = int(self.get_option('reconnection_retries')) + 1
cmd_summary = u"%s..." % to_text(args[0]) cmd_summary = u"%s..." % to_text(args[0])
conn_password = self.get_option('password') or self._play_context.password conn_password = self.get_option('password') or self._play_context.password
is_sshpass = self.get_option('password_mechanism') == 'sshpass'
for attempt in range(remaining_tries): for attempt in range(remaining_tries):
cmd = t.cast(list[bytes], args[0]) cmd = t.cast(list[bytes], args[0])
if attempt != 0 and conn_password and isinstance(cmd, list): if attempt != 0 and is_sshpass and conn_password and isinstance(cmd, list):
# If this is a retry, the fd/pipe for sshpass is closed, and we need a new one # If this is a retry, the fd/pipe for sshpass is closed, and we need a new one
self.sshpass_pipe = os.pipe() self.sshpass_pipe = os.pipe()
cmd[1] = b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict') cmd[1] = b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict')
@ -517,7 +539,7 @@ def _ssh_retry(
except (AnsibleControlPersistBrokenPipeError): except (AnsibleControlPersistBrokenPipeError):
# Retry one more time because of the ControlPersist broken pipe (see #16731) # Retry one more time because of the ControlPersist broken pipe (see #16731)
cmd = t.cast(list[bytes], args[0]) cmd = t.cast(list[bytes], args[0])
if conn_password and isinstance(cmd, list): if is_sshpass and conn_password and isinstance(cmd, list):
# This is a retry, so the fd/pipe for sshpass is closed, and we need a new one # This is a retry, so the fd/pipe for sshpass is closed, and we need a new one
self.sshpass_pipe = os.pipe() self.sshpass_pipe = os.pipe()
cmd[1] = b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict') cmd[1] = b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict')
@ -558,6 +580,24 @@ def _ssh_retry(
return wrapped return wrapped
def _clean_shm(func):
def inner(self, *args, **kwargs):
try:
ret = func(self, *args, **kwargs)
finally:
if self.shm:
self.shm.close()
with contextlib.suppress(FileNotFoundError):
self.shm.unlink()
if not _HAS_RESOURCE_TRACK:
# deprecated: description='unneeded due to track argument for SharedMemory' python_version='3.13'
# There is a resource tracking issue where the resource is deleted, but tracking still has a record
# This will effectively overwrite the record and remove it
SharedMemory(name=self.shm.name, create=True, size=1).unlink()
return ret
return inner
class Connection(ConnectionBase): class Connection(ConnectionBase):
""" ssh based connections """ """ ssh based connections """
@ -573,6 +613,8 @@ class Connection(ConnectionBase):
self.user = self._play_context.remote_user self.user = self._play_context.remote_user
self.control_path: str | None = None self.control_path: str | None = None
self.control_path_dir: str | None = None self.control_path_dir: str | None = None
self.shm: SharedMemory | None = None
self.sshpass_pipe: tuple[int, int] | None = None
# Windows operates differently from a POSIX connection/shell plugin, # Windows operates differently from a POSIX connection/shell plugin,
# we need to set various properties to ensure SSH on Windows continues # we need to set various properties to ensure SSH on Windows continues
@ -614,17 +656,10 @@ class Connection(ConnectionBase):
def _sshpass_available() -> bool: def _sshpass_available() -> bool:
global SSHPASS_AVAILABLE global SSHPASS_AVAILABLE
# We test once if sshpass is available, and remember the result. It # We test once if sshpass is available, and remember the result.
# would be nice to use distutils.spawn.find_executable for this, but
# distutils isn't always available; shutils.which() is Python3-only.
if SSHPASS_AVAILABLE is None: if SSHPASS_AVAILABLE is None:
try: SSHPASS_AVAILABLE = shutil.which('sshpass') is not None
p = subprocess.Popen(["sshpass"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.communicate()
SSHPASS_AVAILABLE = True
except OSError:
SSHPASS_AVAILABLE = False
return SSHPASS_AVAILABLE return SSHPASS_AVAILABLE
@ -677,17 +712,18 @@ class Connection(ConnectionBase):
b_command = [] b_command = []
conn_password = self.get_option('password') or self._play_context.password conn_password = self.get_option('password') or self._play_context.password
pkcs11_provider = self.get_option("pkcs11_provider")
password_mechanism = self.get_option('password_mechanism')
# #
# First, the command to invoke # First, the command to invoke
# #
# If we want to use password authentication, we have to set up a pipe to # If we want to use sshpass for password authentication, we have to set up a pipe to
# write the password to sshpass. # write the password to sshpass.
pkcs11_provider = self.get_option("pkcs11_provider") if password_mechanism == 'sshpass' and (conn_password or pkcs11_provider):
if conn_password or pkcs11_provider:
if not self._sshpass_available(): if not self._sshpass_available():
raise AnsibleError("to use the 'ssh' connection type with passwords or pkcs11_provider, you must install the sshpass program") raise AnsibleError("to use the password_mechanism=sshpass, you must install the sshpass program")
if not conn_password and pkcs11_provider: if not conn_password and pkcs11_provider:
raise AnsibleError("to use pkcs11_provider you must specify a password/pin") raise AnsibleError("to use pkcs11_provider you must specify a password/pin")
@ -720,12 +756,12 @@ class Connection(ConnectionBase):
# sftp batch mode allows us to correctly catch failed transfers, but can # sftp batch mode allows us to correctly catch failed transfers, but can
# be disabled if the client side doesn't support the option. However, # be disabled if the client side doesn't support the option. However,
# sftp batch mode does not prompt for passwords so it must be disabled # sftp batch mode does not prompt for passwords so it must be disabled
# if not using controlpersist and using sshpass # if not using controlpersist and using password auth
b_args: t.Iterable[bytes] b_args: t.Iterable[bytes]
if subsystem == 'sftp' and self.get_option('sftp_batch_mode'): if subsystem == 'sftp' and self.get_option('sftp_batch_mode'):
if conn_password: if conn_password:
b_args = [b'-o', b'BatchMode=no'] b_args = [b'-o', b'BatchMode=no']
self._add_args(b_command, b_args, u'disable batch mode for sshpass') self._add_args(b_command, b_args, u'disable batch mode for password auth')
b_command += [b'-b', b'-'] b_command += [b'-b', b'-']
if display.verbosity: if display.verbosity:
@ -906,6 +942,46 @@ class Connection(ConnectionBase):
return b''.join(output), remainder return b''.join(output), remainder
def _init_shm(self) -> tuple[dict[str, str], dict[str, t.Any]]:
env = os.environ.copy()
popen_kwargs: dict[str, t.Any] = {}
if self.get_option('password_mechanism') != 'ssh_askpass':
return env, popen_kwargs
conn_password = self.get_option('password') or self._play_context.password
pkcs11_provider = self.get_option("pkcs11_provider")
if not conn_password and pkcs11_provider:
raise AnsibleError("to use pkcs11_provider you must specify a password/pin")
if not conn_password:
return env, popen_kwargs
b_conn_password = conn_password.encode('utf-8')
kwargs = {}
if _HAS_RESOURCE_TRACK:
# deprecated: description='track argument for SharedMemory always available' python_version='3.13'
kwargs['track'] = False
self.shm = shm = SharedMemory(create=True, size=16384, **kwargs) # type: ignore[arg-type]
shm.buf[:len(b_conn_password)] = bytearray(b_conn_password)
shm.close()
env['_ANSIBLE_SSH_ASKPASS_SHM'] = str(self.shm.name)
adhoc = pathlib.Path(sys.argv[0]).with_name('ansible')
env['SSH_ASKPASS'] = str(adhoc) if adhoc.is_file() else 'ansible'
# SSH_ASKPASS_REQUIRE was added in openssh 8.4, prior to 8.4 there must be no tty, and DISPLAY must be set
env['SSH_ASKPASS_REQUIRE'] = 'force'
env['DISPLAY'] = '-'
popen_kwargs['env'] = env
# start_new_session runs setsid which detaches the tty to support the use of ASKPASS prior to openssh 8.4
popen_kwargs['start_new_session'] = True
return env, popen_kwargs
@_clean_shm
def _bare_run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True) -> tuple[int, bytes, bytes]: def _bare_run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True) -> tuple[int, bytes, bytes]:
""" """
Starts the command and communicates with it until it ends. Starts the command and communicates with it until it ends.
@ -915,6 +991,9 @@ class Connection(ConnectionBase):
display_cmd = u' '.join(shlex.quote(to_text(c)) for c in cmd) display_cmd = u' '.join(shlex.quote(to_text(c)) for c in cmd)
display.vvv(u'SSH: EXEC {0}'.format(display_cmd), host=self.host) display.vvv(u'SSH: EXEC {0}'.format(display_cmd), host=self.host)
conn_password = self.get_option('password') or self._play_context.password
password_mechanism = self.get_option('password_mechanism')
# Start the given command. If we don't need to pipeline data, we can try # Start the given command. If we don't need to pipeline data, we can try
# to use a pseudo-tty (ssh will have been invoked with -tt). If we are # to use a pseudo-tty (ssh will have been invoked with -tt). If we are
# pipelining data, or can't create a pty, we fall back to using plain # pipelining data, or can't create a pty, we fall back to using plain
@ -927,17 +1006,16 @@ class Connection(ConnectionBase):
else: else:
cmd = list(map(to_bytes, cmd)) cmd = list(map(to_bytes, cmd))
conn_password = self.get_option('password') or self._play_context.password env, popen_kwargs = self._init_shm()
if self.sshpass_pipe:
popen_kwargs['pass_fds'] = self.sshpass_pipe
if not in_data: if not in_data:
try: try:
# Make sure stdin is a proper pty to avoid tcgetattr errors # Make sure stdin is a proper pty to avoid tcgetattr errors
master, slave = pty.openpty() master, slave = pty.openpty()
if PY3 and conn_password: p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **popen_kwargs)
# pylint: disable=unexpected-keyword-arg
p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, pass_fds=self.sshpass_pipe)
else:
p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdin = os.fdopen(master, 'wb', 0) stdin = os.fdopen(master, 'wb', 0)
os.close(slave) os.close(slave)
except (OSError, IOError): except (OSError, IOError):
@ -945,21 +1023,13 @@ class Connection(ConnectionBase):
if not p: if not p:
try: try:
if PY3 and conn_password: p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
# pylint: disable=unexpected-keyword-arg stderr=subprocess.PIPE, **popen_kwargs)
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, pass_fds=self.sshpass_pipe)
else:
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdin = p.stdin # type: ignore[assignment] # stdin will be set and not None due to the calls above stdin = p.stdin # type: ignore[assignment] # stdin will be set and not None due to the calls above
except (OSError, IOError) as e: except (OSError, IOError) as e:
raise AnsibleError('Unable to execute ssh command line on a controller due to: %s' % to_native(e)) raise AnsibleError('Unable to execute ssh command line on a controller due to: %s' % to_native(e))
# If we are using SSH password authentication, write the password into if password_mechanism == 'sshpass' and conn_password:
# the pipe we opened in _build_command.
if conn_password:
os.close(self.sshpass_pipe[0]) os.close(self.sshpass_pipe[0])
try: try:
os.write(self.sshpass_pipe[1], to_bytes(conn_password) + b'\n') os.write(self.sshpass_pipe[1], to_bytes(conn_password) + b'\n')
@ -1178,10 +1248,11 @@ class Connection(ConnectionBase):
p.stdout.close() p.stdout.close()
p.stderr.close() p.stderr.close()
if self.get_option('host_key_checking'): conn_password = self.get_option('password') or self._play_context.password
if cmd[0] == b"sshpass" and p.returncode == 6: hostkey_fail = (cmd[0] == b"sshpass" and p.returncode == 6) or b"read_passphrase: can't open /dev/tty" in b_stderr
raise AnsibleError('Using a SSH password instead of a key is not possible because Host Key checking is enabled and sshpass does not support ' if password_mechanism and self.get_option('host_key_checking') and conn_password and hostkey_fail:
'this. Please add this host\'s fingerprint to your known_hosts file to manage this host.') raise AnsibleError('Using a SSH password instead of a key is not possible because Host Key checking is enabled. '
'Please add this host\'s fingerprint to your known_hosts file to manage this host.')
controlpersisterror = b'Bad configuration option: ControlPersist' in b_stderr or b'unknown configuration option: ControlPersist' in b_stderr controlpersisterror = b'Bad configuration option: ControlPersist' in b_stderr or b'unknown configuration option: ControlPersist' in b_stderr
if p.returncode != 0 and controlpersisterror: if p.returncode != 0 and controlpersisterror:

@ -19,6 +19,7 @@ if command -v sshpass > /dev/null; then
# that the flag gets passed to sshpass. # that the flag gets passed to sshpass.
timeout 5 ansible -m ping \ timeout 5 ansible -m ping \
-e ansible_connection=ssh \ -e ansible_connection=ssh \
-e ansible_ssh_password_mechanism=sshpass \
-e ansible_sshpass_prompt=notThis: \ -e ansible_sshpass_prompt=notThis: \
-e ansible_password=foo \ -e ansible_password=foo \
-e ansible_user=definitelynotroot \ -e ansible_user=definitelynotroot \
@ -34,6 +35,7 @@ if command -v sshpass > /dev/null; then
else else
ansible -m ping \ ansible -m ping \
-e ansible_connection=ssh \ -e ansible_connection=ssh \
-e ansible_ssh_password_mechanism=sshpass \
-e ansible_sshpass_prompt=notThis: \ -e ansible_sshpass_prompt=notThis: \
-e ansible_password=foo \ -e ansible_password=foo \
-e ansible_user=definitelynotroot \ -e ansible_user=definitelynotroot \

@ -23,7 +23,6 @@ from selectors import SelectorKey, EVENT_READ
import pytest import pytest
from ansible.errors import AnsibleAuthenticationFailure
import unittest import unittest
from unittest.mock import patch, MagicMock, PropertyMock from unittest.mock import patch, MagicMock, PropertyMock
from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound
@ -54,22 +53,6 @@ class TestConnectionBaseClass(unittest.TestCase):
res = conn._connect() res = conn._connect()
self.assertEqual(conn, res) self.assertEqual(conn, res)
ssh.SSHPASS_AVAILABLE = False
self.assertFalse(conn._sshpass_available())
ssh.SSHPASS_AVAILABLE = True
self.assertTrue(conn._sshpass_available())
with patch('subprocess.Popen') as p:
ssh.SSHPASS_AVAILABLE = None
p.return_value = MagicMock()
self.assertTrue(conn._sshpass_available())
ssh.SSHPASS_AVAILABLE = None
p.return_value = None
p.side_effect = OSError()
self.assertFalse(conn._sshpass_available())
conn.close() conn.close()
self.assertFalse(conn._connected) self.assertFalse(conn._connected)
@ -412,29 +395,6 @@ class TestSSHConnectionRun(object):
assert self.conn._send_initial_data.call_count == 1 assert self.conn._send_initial_data.call_count == 1
assert self.conn._send_initial_data.call_args[0][1] == 'this is input data' assert self.conn._send_initial_data.call_args[0][1] == 'this is input data'
def test_with_password(self):
# test with a password set to trigger the sshpass write
self.pc.password = '12345'
self.mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""]
self.mock_popen_res.stderr.read.side_effect = [b""]
self.mock_selector.select.side_effect = [
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[]]
self.mock_selector.get_map.side_effect = lambda: True
return_code, b_stdout, b_stderr = self.conn._run(["ssh", "is", "a", "cmd"], "this is more data")
assert return_code == 0
assert b_stdout == b'some data'
assert b_stderr == b''
assert self.mock_selector.register.called is True
assert self.mock_selector.register.call_count == 2
assert self.conn._send_initial_data.called is True
assert self.conn._send_initial_data.call_count == 1
assert self.conn._send_initial_data.call_args[0][1] == 'this is more data'
def _password_with_prompt_examine_output(self, sourice, state, b_chunk, sudoable): def _password_with_prompt_examine_output(self, sourice, state, b_chunk, sudoable):
if state == 'awaiting_prompt': if state == 'awaiting_prompt':
self.conn._flags['become_prompt'] = True self.conn._flags['become_prompt'] = True
@ -525,30 +485,6 @@ class TestSSHConnectionRun(object):
@pytest.mark.usefixtures('mock_run_env') @pytest.mark.usefixtures('mock_run_env')
class TestSSHConnectionRetries(object): class TestSSHConnectionRetries(object):
def test_incorrect_password(self, monkeypatch):
self.conn.set_option('host_key_checking', False)
self.conn.set_option('reconnection_retries', 5)
self.mock_popen_res.stdout.read.side_effect = [b'']
self.mock_popen_res.stderr.read.side_effect = [b'Permission denied, please try again.\r\n']
type(self.mock_popen_res).returncode = PropertyMock(side_effect=[5] * 4)
self.mock_selector.select.side_effect = [
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)],
[],
]
self.mock_selector.get_map.side_effect = lambda: True
self.conn._build_command = MagicMock()
self.conn._build_command.return_value = [b'sshpass', b'-d41', b'ssh', b'-C']
exception_info = pytest.raises(AnsibleAuthenticationFailure, self.conn.exec_command, 'sshpass', 'some data')
assert exception_info.value.message == ('Invalid/incorrect username/password. Skipping remaining 5 retries to prevent account lockout: '
'Permission denied, please try again.')
assert self.mock_popen.call_count == 1
def test_retry_then_success(self, monkeypatch): def test_retry_then_success(self, monkeypatch):
self.conn.set_option('host_key_checking', False) self.conn.set_option('host_key_checking', False)
self.conn.set_option('reconnection_retries', 3) self.conn.set_option('reconnection_retries', 3)

Loading…
Cancel
Save