no sshpass

pull/51985/head
Matt Martz 3 months ago
parent 11e56d9c27
commit c8b8ea5fcb
No known key found for this signature in database
GPG Key ID: 40832D88E9FC91D8

@ -76,6 +76,10 @@ DOCUMENTATION = '''
vars: vars:
- name: ansible_sshpass_prompt - name: ansible_sshpass_prompt
version_added: '2.10' version_added: '2.10'
deprecated:
why: due to removal of reliance on sshpass
version: "2.20"
alternatives: none
ssh_args: ssh_args:
description: Arguments to pass to all SSH CLI tools. description: Arguments to pass to all SSH CLI tools.
default: '-C -o ControlMaster=auto -o ControlPersist=60s' default: '-C -o ControlMaster=auto -o ControlPersist=60s'
@ -357,7 +361,6 @@ DOCUMENTATION = '''
type: string type: string
description: description:
- "PKCS11 SmartCard provider such as opensc, example: /usr/local/lib/opensc-pkcs11.so" - "PKCS11 SmartCard provider such as opensc, example: /usr/local/lib/opensc-pkcs11.so"
- Requires sshpass version 1.06+, sshpass must support the -P option.
env: [{name: ANSIBLE_PKCS11_PROVIDER}] env: [{name: ANSIBLE_PKCS11_PROVIDER}]
ini: ini:
- {key: pkcs11_provider, section: ssh_connection} - {key: pkcs11_provider, section: ssh_connection}
@ -366,7 +369,7 @@ DOCUMENTATION = '''
''' '''
import collections.abc as c import collections.abc as c
import errno import contextlib
import fcntl import fcntl
import hashlib import hashlib
import io import io
@ -376,22 +379,25 @@ import re
import selectors import selectors
import shlex import shlex
import subprocess import subprocess
import sys
import time import time
import typing as t import typing as t
from functools import wraps from functools import wraps
from multiprocessing.shared_memory import SharedMemory
from ansible.errors import ( from ansible.errors import (
AnsibleAuthenticationFailure, AnsibleAuthenticationFailure,
AnsibleConnectionFailure, AnsibleConnectionFailure,
AnsibleError, AnsibleError,
AnsibleFileNotFound, AnsibleFileNotFound,
) )
from ansible.module_utils.six import PY3, text_type, binary_type from ansible.module_utils.six import text_type, binary_type
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.plugins.connection import ConnectionBase, BUFSIZE from ansible.plugins.connection import ConnectionBase, BUFSIZE
from ansible.plugins.shell.powershell import _parse_clixml from ansible.plugins.shell.powershell import _parse_clixml
from ansible.utils.display import Display from ansible.utils.display import Display
from ansible.utils.path import unfrackpath, makedirs_safe from ansible.utils.path import unfrackpath, makedirs_safe
from ansible.ssh_askpass import ssh_askpass
display = Display() display = Display()
@ -408,6 +414,8 @@ b_NOT_SSH_ERRORS = (b'Traceback (most recent call last):', # Python-2.6 when th
SSHPASS_AVAILABLE = None SSHPASS_AVAILABLE = None
SSH_DEBUG = re.compile(r'^debug\d+: .*') SSH_DEBUG = re.compile(r'^debug\d+: .*')
_HAS_RESOURCE_TRACK = sys.version_info[:2] >= (3, 13)
class AnsibleControlPersistBrokenPipeError(AnsibleError): class AnsibleControlPersistBrokenPipeError(AnsibleError):
''' ControlPersist broken pipe ''' ''' ControlPersist broken pipe '''
@ -423,34 +431,6 @@ def _handle_error(
display: Display = display, display: Display = display,
) -> None: ) -> None:
# sshpass errors
if command == b'sshpass':
# Error 5 is invalid/incorrect password. Raise an exception to prevent retries from locking the account.
if return_tuple[0] == 5:
msg = 'Invalid/incorrect username/password. Skipping remaining {0} retries to prevent account lockout:'.format(remaining_retries)
if remaining_retries <= 0:
msg = 'Invalid/incorrect password:'
if no_log:
msg = '{0} <error censored due to no log>'.format(msg)
else:
msg = '{0} {1}'.format(msg, to_native(return_tuple[2]).rstrip())
raise AnsibleAuthenticationFailure(msg)
# sshpass returns codes are 1-6. We handle 5 previously, so this catches other scenarios.
# No exception is raised, so the connection is retried - except when attempting to use
# sshpass_prompt with an sshpass that won't let us pass -P, in which case we fail loudly.
elif return_tuple[0] in [1, 2, 3, 4, 6]:
msg = 'sshpass error:'
if no_log:
msg = '{0} <error censored due to no log>'.format(msg)
else:
details = to_native(return_tuple[2]).rstrip()
if "sshpass: invalid option -- 'P'" in details:
details = 'Installed sshpass version does not support customized password prompts. ' \
'Upgrade sshpass to use sshpass_prompt, or otherwise switch to ssh keys.'
raise AnsibleError('{0} {1}'.format(msg, details))
msg = '{0} {1}'.format(msg, details)
if return_tuple[0] == 255: if return_tuple[0] == 255:
SSH_ERROR = True SSH_ERROR = True
for signature in b_NOT_SSH_ERRORS: for signature in b_NOT_SSH_ERRORS:
@ -487,7 +467,6 @@ def _ssh_retry(
* an exception is caught * an exception is caught
* ssh returns 255 * ssh returns 255
Will not retry if Will not retry if
* sshpass returns 5 (invalid password, to prevent account lockouts)
* remaining_tries is < 2 * remaining_tries is < 2
* retries limit reached * retries limit reached
""" """
@ -498,10 +477,6 @@ def _ssh_retry(
conn_password = self.get_option('password') or self._play_context.password conn_password = self.get_option('password') or self._play_context.password
for attempt in range(remaining_tries): for attempt in range(remaining_tries):
cmd = t.cast(list[bytes], args[0]) cmd = t.cast(list[bytes], args[0])
if attempt != 0 and conn_password and isinstance(cmd, list):
# If this is a retry, the fd/pipe for sshpass is closed, and we need a new one
self.sshpass_pipe = os.pipe()
cmd[1] = b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict')
try: try:
try: try:
@ -517,10 +492,6 @@ def _ssh_retry(
except (AnsibleControlPersistBrokenPipeError): except (AnsibleControlPersistBrokenPipeError):
# Retry one more time because of the ControlPersist broken pipe (see #16731) # Retry one more time because of the ControlPersist broken pipe (see #16731)
cmd = t.cast(list[bytes], args[0]) cmd = t.cast(list[bytes], args[0])
if conn_password and isinstance(cmd, list):
# This is a retry, so the fd/pipe for sshpass is closed, and we need a new one
self.sshpass_pipe = os.pipe()
cmd[1] = b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict')
display.vvv(u"RETRYING BECAUSE OF CONTROLPERSIST BROKEN PIPE") display.vvv(u"RETRYING BECAUSE OF CONTROLPERSIST BROKEN PIPE")
return_tuple = func(self, *args, **kwargs) return_tuple = func(self, *args, **kwargs)
@ -529,7 +500,6 @@ def _ssh_retry(
break break
# 5 = Invalid/incorrect password from sshpass
except AnsibleAuthenticationFailure: except AnsibleAuthenticationFailure:
# Raising this exception, which is subclassed from AnsibleConnectionFailure, prevents further retries # Raising this exception, which is subclassed from AnsibleConnectionFailure, prevents further retries
raise raise
@ -558,6 +528,24 @@ def _ssh_retry(
return wrapped return wrapped
def _clean_shm(func):
def inner(self, *args, **kwargs):
try:
ret = func(self, *args, **kwargs)
finally:
with contextlib.suppress(AttributeError):
self.shm.close()
with contextlib.suppress(FileNotFoundError):
self.shm.unlink()
if not _HAS_RESOURCE_TRACK:
# deprecated: description='unneeded due to track argument for SharedMemory' python_version='3.13'
# There is a resource tracking issue where the resource is deleted, but tracking still has a record
# This will effectively overwrite the record and remove it
SharedMemory(name=self.shm.name, create=True, size=1).unlink()
return ret
return inner
class Connection(ConnectionBase): class Connection(ConnectionBase):
''' ssh based connections ''' ''' ssh based connections '''
@ -573,6 +561,7 @@ class Connection(ConnectionBase):
self.user = self._play_context.remote_user self.user = self._play_context.remote_user
self.control_path: str | None = None self.control_path: str | None = None
self.control_path_dir: str | None = None self.control_path_dir: str | None = None
self.shm = None
# Windows operates differently from a POSIX connection/shell plugin, # Windows operates differently from a POSIX connection/shell plugin,
# we need to set various properties to ensure SSH on Windows continues # we need to set various properties to ensure SSH on Windows continues
@ -610,24 +599,6 @@ class Connection(ConnectionBase):
cpath = '%(directory)s/' + digest[:10] cpath = '%(directory)s/' + digest[:10]
return cpath return cpath
@staticmethod
def _sshpass_available() -> bool:
global SSHPASS_AVAILABLE
# We test once if sshpass is available, and remember the result. It
# would be nice to use distutils.spawn.find_executable for this, but
# distutils isn't always available; shutils.which() is Python3-only.
if SSHPASS_AVAILABLE is None:
try:
p = subprocess.Popen(["sshpass"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.communicate()
SSHPASS_AVAILABLE = True
except OSError:
SSHPASS_AVAILABLE = False
return SSHPASS_AVAILABLE
@staticmethod @staticmethod
def _persistence_controls(b_command: list[bytes]) -> tuple[bool, bool]: def _persistence_controls(b_command: list[bytes]) -> tuple[bool, bool]:
''' '''
@ -677,31 +648,12 @@ class Connection(ConnectionBase):
b_command = [] b_command = []
conn_password = self.get_option('password') or self._play_context.password conn_password = self.get_option('password') or self._play_context.password
pkcs11_provider = self.get_option("pkcs11_provider")
# #
# First, the command to invoke # First, the command to invoke
# #
# If we want to use password authentication, we have to set up a pipe to
# write the password to sshpass.
pkcs11_provider = self.get_option("pkcs11_provider")
if conn_password or pkcs11_provider:
if not self._sshpass_available():
raise AnsibleError("to use the 'ssh' connection type with passwords or pkcs11_provider, you must install the sshpass program")
if not conn_password and pkcs11_provider:
raise AnsibleError("to use pkcs11_provider you must specify a password/pin")
self.sshpass_pipe = os.pipe()
b_command += [b'sshpass', b'-d' + to_bytes(self.sshpass_pipe[0], nonstring='simplerepr', errors='surrogate_or_strict')]
password_prompt = self.get_option('sshpass_prompt')
if not password_prompt and pkcs11_provider:
# Set default password prompt for pkcs11_provider to make it clear its a PIN
password_prompt = 'Enter PIN for '
if password_prompt:
b_command += [b'-P', to_bytes(password_prompt, errors='surrogate_or_strict')]
b_command += [to_bytes(binary, errors='surrogate_or_strict')] b_command += [to_bytes(binary, errors='surrogate_or_strict')]
# #
@ -720,12 +672,12 @@ class Connection(ConnectionBase):
# sftp batch mode allows us to correctly catch failed transfers, but can # sftp batch mode allows us to correctly catch failed transfers, but can
# be disabled if the client side doesn't support the option. However, # be disabled if the client side doesn't support the option. However,
# sftp batch mode does not prompt for passwords so it must be disabled # sftp batch mode does not prompt for passwords so it must be disabled
# if not using controlpersist and using sshpass # if not using controlpersist and using password auth
b_args: t.Iterable[bytes] b_args: t.Iterable[bytes]
if subsystem == 'sftp' and self.get_option('sftp_batch_mode'): if subsystem == 'sftp' and self.get_option('sftp_batch_mode'):
if conn_password: if conn_password:
b_args = [b'-o', b'BatchMode=no'] b_args = [b'-o', b'BatchMode=no']
self._add_args(b_command, b_args, u'disable batch mode for sshpass') self._add_args(b_command, b_args, u'disable batch mode for password auth')
b_command += [b'-b', b'-'] b_command += [b'-b', b'-']
if display.verbosity: if display.verbosity:
@ -906,6 +858,7 @@ class Connection(ConnectionBase):
return b''.join(output), remainder return b''.join(output), remainder
@_clean_shm
def _bare_run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True) -> tuple[int, bytes, bytes]: def _bare_run(self, cmd: list[bytes], in_data: bytes | None, sudoable: bool = True, checkrc: bool = True) -> tuple[int, bytes, bytes]:
''' '''
Starts the command and communicates with it until it ends. Starts the command and communicates with it until it ends.
@ -927,17 +880,38 @@ class Connection(ConnectionBase):
else: else:
cmd = list(map(to_bytes, cmd)) cmd = list(map(to_bytes, cmd))
env = os.environ.copy()
popen_kwargs = {}
conn_password = self.get_option('password') or self._play_context.password conn_password = self.get_option('password') or self._play_context.password
pkcs11_provider = self.get_option("pkcs11_provider")
if conn_password or pkcs11_provider:
if not conn_password and pkcs11_provider:
raise AnsibleError("to use pkcs11_provider you must specify a password/pin")
b_conn_password = conn_password.encode('utf-8')
kwargs = {}
if _HAS_RESOURCE_TRACK:
# deprecated: description='track argument for SharedMemory always available' python_version='3.13'
kwargs['track'] = False
self.shm = shm = SharedMemory(create=True, size=16384, **kwargs)
shm.buf[:len(b_conn_password)] = bytearray(b_conn_password)
shm.close()
env['_ANSIBLE_SSH_ASKPASS_SHM'] = str(self.shm.name)
env['SSH_ASKPASS'] = os.path.splitext(ssh_askpass.__file__)[0] + '.py'
# SSH_ASKPASS_REQUIRE was added in openssh 8.4, prior to 8.4 there must be no tty, and DISPLAY must be set
env['SSH_ASKPASS_REQUIRE'] = 'force'
env['DISPLAY'] = '-'
popen_kwargs['env'] = env
# start_new_session runs setsid which detaches the tty to support the use of ASKPASS prior to openssh 8.4
popen_kwargs['start_new_session'] = True
if not in_data: if not in_data:
try: try:
# Make sure stdin is a proper pty to avoid tcgetattr errors # Make sure stdin is a proper pty to avoid tcgetattr errors
master, slave = pty.openpty() master, slave = pty.openpty()
if PY3 and conn_password: # pylint: disable=unexpected-keyword-arg
# pylint: disable=unexpected-keyword-arg p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **popen_kwargs)
p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, pass_fds=self.sshpass_pipe)
else:
p = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdin = os.fdopen(master, 'wb', 0) stdin = os.fdopen(master, 'wb', 0)
os.close(slave) os.close(slave)
except (OSError, IOError): except (OSError, IOError):
@ -945,30 +919,13 @@ class Connection(ConnectionBase):
if not p: if not p:
try: try:
if PY3 and conn_password: # pylint: disable=unexpected-keyword-arg
# pylint: disable=unexpected-keyword-arg p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **popen_kwargs)
stderr=subprocess.PIPE, pass_fds=self.sshpass_pipe)
else:
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdin = p.stdin # type: ignore[assignment] # stdin will be set and not None due to the calls above stdin = p.stdin # type: ignore[assignment] # stdin will be set and not None due to the calls above
except (OSError, IOError) as e: except (OSError, IOError) as e:
raise AnsibleError('Unable to execute ssh command line on a controller due to: %s' % to_native(e)) raise AnsibleError('Unable to execute ssh command line on a controller due to: %s' % to_native(e))
# If we are using SSH password authentication, write the password into
# the pipe we opened in _build_command.
if conn_password:
os.close(self.sshpass_pipe[0])
try:
os.write(self.sshpass_pipe[1], to_bytes(conn_password) + b'\n')
except OSError as e:
# Ignore broken pipe errors if the sshpass process has exited.
if e.errno != errno.EPIPE or p.poll() is None:
raise
os.close(self.sshpass_pipe[1])
# #
# SSH state machine # SSH state machine
# #
@ -1176,10 +1133,9 @@ class Connection(ConnectionBase):
p.stdout.close() p.stdout.close()
p.stderr.close() p.stderr.close()
if self.get_option('host_key_checking'): if self.get_option('host_key_checking') and conn_password and b"read_passphrase: can't open /dev/tty" in b_stderr:
if cmd[0] == b"sshpass" and p.returncode == 6: raise AnsibleError('Using a SSH password instead of a key is not possible because Host Key checking is enabled. '
raise AnsibleError('Using a SSH password instead of a key is not possible because Host Key checking is enabled and sshpass does not support ' 'Please add this host\'s fingerprint to your known_hosts file to manage this host.')
'this. Please add this host\'s fingerprint to your known_hosts file to manage this host.')
controlpersisterror = b'Bad configuration option: ControlPersist' in b_stderr or b'unknown configuration option: ControlPersist' in b_stderr controlpersisterror = b'Bad configuration option: ControlPersist' in b_stderr or b'unknown configuration option: ControlPersist' in b_stderr
if p.returncode != 0 and controlpersisterror: if p.returncode != 0 and controlpersisterror:

@ -0,0 +1,20 @@
#!/usr/bin/env python3
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Copyright: Contributors to the Ansible project
import os
import sys
from multiprocessing.shared_memory import SharedMemory
if __name__ == '__main__':
kwargs = {}
if sys.version_info[:2] >= (3, 13):
kwargs['track'] = False
try:
shm = SharedMemory(name=os.environ['_ANSIBLE_SSH_ASKPASS_SHM'], **kwargs)
except FileNotFoundError:
# We must be running after the ansible fork is shutting down
sys.exit(1)
sys.stdout.buffer.write(shm.buf.tobytes().rstrip(b'\x00'))
shm.buf[:] = b'\x00' * shm.size
shm.close()

@ -1,50 +1,6 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -ux set -eux
# We skip this whole section if the test node doesn't have sshpass on it.
if command -v sshpass > /dev/null; then
# Check if our sshpass supports -P
sshpass -P foo > /dev/null
sshpass_supports_prompt=$?
if [[ $sshpass_supports_prompt -eq 0 ]]; then
# If the prompt is wrong, we'll end up hanging (due to sshpass hanging).
# We should probably do something better here, like timing out in Ansible,
# but this has been the behavior for a long time, before we supported custom
# password prompts.
#
# So we search for a custom password prompt that is clearly wrong and call
# ansible with timeout. If we time out, our custom prompt was successfully
# searched for. It's a weird way of doing things, but it does ensure
# that the flag gets passed to sshpass.
timeout 5 ansible -m ping \
-e ansible_connection=ssh \
-e ansible_sshpass_prompt=notThis: \
-e ansible_password=foo \
-e ansible_user=definitelynotroot \
-i test_connection.inventory \
ssh-pipelining
ret=$?
# 124 is EXIT_TIMEDOUT from gnu coreutils
# 143 is 128+SIGTERM(15) from BusyBox
if [[ $ret -ne 124 && $ret -ne 143 ]]; then
echo "Expected to time out and we did not. Exiting with failure."
exit 1
fi
else
ansible -m ping \
-e ansible_connection=ssh \
-e ansible_sshpass_prompt=notThis: \
-e ansible_password=foo \
-e ansible_user=definitelynotroot \
-i test_connection.inventory \
ssh-pipelining | grep 'customized password prompts'
ret=$?
[[ $ret -eq 0 ]] || exit $ret
fi
fi
set -e
if [[ "$(scp -O 2>&1)" == "usage: scp "* ]]; then if [[ "$(scp -O 2>&1)" == "usage: scp "* ]]; then
# scp supports the -O option (and thus the -T option as well) # scp supports the -O option (and thus the -T option as well)

@ -23,7 +23,6 @@ from selectors import SelectorKey, EVENT_READ
import pytest import pytest
from ansible.errors import AnsibleAuthenticationFailure
import unittest import unittest
from unittest.mock import patch, MagicMock, PropertyMock from unittest.mock import patch, MagicMock, PropertyMock
from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound
@ -54,22 +53,6 @@ class TestConnectionBaseClass(unittest.TestCase):
res = conn._connect() res = conn._connect()
self.assertEqual(conn, res) self.assertEqual(conn, res)
ssh.SSHPASS_AVAILABLE = False
self.assertFalse(conn._sshpass_available())
ssh.SSHPASS_AVAILABLE = True
self.assertTrue(conn._sshpass_available())
with patch('subprocess.Popen') as p:
ssh.SSHPASS_AVAILABLE = None
p.return_value = MagicMock()
self.assertTrue(conn._sshpass_available())
ssh.SSHPASS_AVAILABLE = None
p.return_value = None
p.side_effect = OSError()
self.assertFalse(conn._sshpass_available())
conn.close() conn.close()
self.assertFalse(conn._connected) self.assertFalse(conn._connected)
@ -412,29 +395,6 @@ class TestSSHConnectionRun(object):
assert self.conn._send_initial_data.call_count == 1 assert self.conn._send_initial_data.call_count == 1
assert self.conn._send_initial_data.call_args[0][1] == 'this is input data' assert self.conn._send_initial_data.call_args[0][1] == 'this is input data'
def test_with_password(self):
# test with a password set to trigger the sshpass write
self.pc.password = '12345'
self.mock_popen_res.stdout.read.side_effect = [b"some data", b"", b""]
self.mock_popen_res.stderr.read.side_effect = [b""]
self.mock_selector.select.side_effect = [
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[]]
self.mock_selector.get_map.side_effect = lambda: True
return_code, b_stdout, b_stderr = self.conn._run(["ssh", "is", "a", "cmd"], "this is more data")
assert return_code == 0
assert b_stdout == b'some data'
assert b_stderr == b''
assert self.mock_selector.register.called is True
assert self.mock_selector.register.call_count == 2
assert self.conn._send_initial_data.called is True
assert self.conn._send_initial_data.call_count == 1
assert self.conn._send_initial_data.call_args[0][1] == 'this is more data'
def _password_with_prompt_examine_output(self, sourice, state, b_chunk, sudoable): def _password_with_prompt_examine_output(self, sourice, state, b_chunk, sudoable):
if state == 'awaiting_prompt': if state == 'awaiting_prompt':
self.conn._flags['become_prompt'] = True self.conn._flags['become_prompt'] = True
@ -525,30 +485,6 @@ class TestSSHConnectionRun(object):
@pytest.mark.usefixtures('mock_run_env') @pytest.mark.usefixtures('mock_run_env')
class TestSSHConnectionRetries(object): class TestSSHConnectionRetries(object):
def test_incorrect_password(self, monkeypatch):
self.conn.set_option('host_key_checking', False)
self.conn.set_option('reconnection_retries', 5)
self.mock_popen_res.stdout.read.side_effect = [b'']
self.mock_popen_res.stderr.read.side_effect = [b'Permission denied, please try again.\r\n']
type(self.mock_popen_res).returncode = PropertyMock(side_effect=[5] * 4)
self.mock_selector.select.side_effect = [
[(SelectorKey(self.mock_popen_res.stdout, 1001, [EVENT_READ], None), EVENT_READ)],
[(SelectorKey(self.mock_popen_res.stderr, 1002, [EVENT_READ], None), EVENT_READ)],
[],
]
self.mock_selector.get_map.side_effect = lambda: True
self.conn._build_command = MagicMock()
self.conn._build_command.return_value = [b'sshpass', b'-d41', b'ssh', b'-C']
exception_info = pytest.raises(AnsibleAuthenticationFailure, self.conn.exec_command, 'sshpass', 'some data')
assert exception_info.value.message == ('Invalid/incorrect username/password. Skipping remaining 5 retries to prevent account lockout: '
'Permission denied, please try again.')
assert self.mock_popen.call_count == 1
def test_retry_then_success(self, monkeypatch): def test_retry_then_success(self, monkeypatch):
self.conn.set_option('host_key_checking', False) self.conn.set_option('host_key_checking', False)
self.conn.set_option('reconnection_retries', 3) self.conn.set_option('reconnection_retries', 3)

Loading…
Cancel
Save