You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/test/units/plugins/connection/test_winrm.py

540 lines
20 KiB
Python

# -*- coding: utf-8 -*-
# (c) 2018, Jordan Borean <jborean@redhat.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import os
import pytest
from io import StringIO
from unittest.mock import MagicMock
from ansible.errors import AnsibleConnectionFailure, AnsibleError
from ansible.module_utils.common.text.converters import to_bytes
from ansible.playbook.play_context import PlayContext
from ansible.plugins.loader import connection_loader
from ansible.plugins.connection import winrm
pytest.importorskip("winrm")
class TestConnectionWinRM(object):
OPTIONS_DATA = (
# default options
(
{'_extras': {}},
{},
{
'_kerb_managed': False,
'_kinit_cmd': 'kinit',
'_winrm_connection_timeout': None,
'_winrm_host': 'inventory_hostname',
'_winrm_kwargs': {'username': None, 'password': None},
'_winrm_pass': None,
'_winrm_path': '/wsman',
'_winrm_port': 5986,
'_winrm_scheme': 'https',
'_winrm_transport': ['ssl'],
'_winrm_user': None
},
False
),
# http through port
(
{'_extras': {}, 'ansible_port': 5985},
{},
{
'_winrm_kwargs': {'username': None, 'password': None},
'_winrm_port': 5985,
'_winrm_scheme': 'http',
'_winrm_transport': ['plaintext'],
},
False
),
# kerberos user with kerb present
(
{'_extras': {}, 'ansible_user': 'user@domain.com'},
{},
{
'_kerb_managed': False,
'_kinit_cmd': 'kinit',
'_winrm_kwargs': {'username': 'user@domain.com',
'password': None},
'_winrm_pass': None,
'_winrm_transport': ['kerberos', 'ssl'],
'_winrm_user': 'user@domain.com'
},
True
),
# kerberos user without kerb present
(
{'_extras': {}, 'ansible_user': 'user@domain.com'},
{},
{
'_kerb_managed': False,
'_kinit_cmd': 'kinit',
'_winrm_kwargs': {'username': 'user@domain.com',
'password': None},
'_winrm_pass': None,
'_winrm_transport': ['ssl'],
'_winrm_user': 'user@domain.com'
},
False
),
# kerberos user with managed ticket (implicit)
(
{'_extras': {}, 'ansible_user': 'user@domain.com'},
{'remote_password': 'pass'},
{
'_kerb_managed': True,
'_kinit_cmd': 'kinit',
'_winrm_kwargs': {'username': 'user@domain.com',
'password': 'pass'},
'_winrm_pass': 'pass',
'_winrm_transport': ['kerberos', 'ssl'],
'_winrm_user': 'user@domain.com'
},
True
),
# kerb with managed ticket (explicit)
(
{'_extras': {}, 'ansible_user': 'user@domain.com',
'ansible_winrm_kinit_mode': 'managed'},
{'password': 'pass'},
{
'_kerb_managed': True,
},
True
),
# kerb with unmanaged ticket (explicit))
(
{'_extras': {}, 'ansible_user': 'user@domain.com',
'ansible_winrm_kinit_mode': 'manual'},
{'password': 'pass'},
{
'_kerb_managed': False,
},
True
),
# transport override (single)
(
{'_extras': {}, 'ansible_user': 'user@domain.com',
'ansible_winrm_transport': 'ntlm'},
{},
{
'_winrm_kwargs': {'username': 'user@domain.com',
'password': None},
'_winrm_pass': None,
'_winrm_transport': ['ntlm'],
},
False
),
# transport override (list)
(
{'_extras': {}, 'ansible_user': 'user@domain.com',
'ansible_winrm_transport': ['ntlm', 'certificate']},
{},
{
'_winrm_kwargs': {'username': 'user@domain.com',
'password': None},
'_winrm_pass': None,
'_winrm_transport': ['ntlm', 'certificate'],
},
False
),
# winrm extras
(
{'_extras': {'ansible_winrm_server_cert_validation': 'ignore',
'ansible_winrm_service': 'WSMAN'}},
{},
{
'_winrm_kwargs': {'username': None, 'password': None,
'server_cert_validation': 'ignore',
'service': 'WSMAN'},
},
False
),
# direct override
(
{'_extras': {}, 'ansible_winrm_connection_timeout': 5},
{'connection_timeout': 10},
{
'_winrm_connection_timeout': 10,
},
False
),
# password as ansible_password
(
{'_extras': {}, 'ansible_password': 'pass'},
{},
{
'_winrm_pass': 'pass',
'_winrm_kwargs': {'username': None, 'password': 'pass'}
},
False
),
# password as ansible_winrm_pass
(
{'_extras': {}, 'ansible_winrm_pass': 'pass'},
{},
{
'_winrm_pass': 'pass',
'_winrm_kwargs': {'username': None, 'password': 'pass'}
},
False
),
# password as ansible_winrm_password
(
{'_extras': {}, 'ansible_winrm_password': 'pass'},
{},
{
'_winrm_pass': 'pass',
'_winrm_kwargs': {'username': None, 'password': 'pass'}
},
False
),
)
@pytest.mark.parametrize('options, direct, expected, kerb',
((o, d, e, k) for o, d, e, k in OPTIONS_DATA))
def test_set_options(self, options, direct, expected, kerb):
winrm.HAVE_KERBEROS = kerb
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options, direct=direct)
conn._build_winrm_kwargs()
for attr, expected in expected.items():
actual = getattr(conn, attr)
assert actual == expected, \
"winrm attr '%s', actual '%s' != expected '%s'"\
% (attr, actual, expected)
class TestWinRMKerbAuth(object):
@pytest.mark.parametrize('options, expected', [
[{"_extras": {}},
(["kinit", "user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
(["kinit2", "user@domain"],)],
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
(["kinit", "-f", "user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_args': '-f -p'},
(["kinit", "-f", "-p", "user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kerberos_delegation': True, 'ansible_winrm_kinit_args': '-p'},
(["kinit", "-p", "user@domain"],)]
])
def test_kinit_success_subprocess(self, monkeypatch, options, expected):
def mock_communicate(input=None, timeout=None):
return b"", b""
mock_popen = MagicMock()
mock_popen.return_value.communicate = mock_communicate
mock_popen.return_value.returncode = 0
monkeypatch.setattr("subprocess.Popen", mock_popen)
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)
conn._build_winrm_kwargs()
conn._kerb_auth("user@domain", "pass")
mock_calls = mock_popen.mock_calls
assert len(mock_calls) == 1
assert mock_calls[0][1] == expected
actual_env = mock_calls[0][2]['env']
assert sorted(list(actual_env.keys())) == ['KRB5CCNAME', 'PATH']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
assert actual_env['PATH'] == os.environ['PATH']
@pytest.mark.parametrize('options, expected', [
[{"_extras": {}},
("kinit", ["user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
("kinit2", ["user@domain"],)],
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
("kinit", ["-f", "user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kinit_args': '-f -p'},
("kinit", ["-f", "-p", "user@domain"],)],
[{"_extras": {}, 'ansible_winrm_kerberos_delegation': True, 'ansible_winrm_kinit_args': '-p'},
("kinit", ["-p", "user@domain"],)]
])
def test_kinit_success_pexpect(self, monkeypatch, options, expected):
pytest.importorskip("pexpect")
mock_pexpect = MagicMock()
mock_pexpect.return_value.exitstatus = 0
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options=options)
conn._build_winrm_kwargs()
conn._kerb_auth("user@domain", "pass")
mock_calls = mock_pexpect.mock_calls
assert mock_calls[0][1] == expected
actual_env = mock_calls[0][2]['env']
assert sorted(list(actual_env.keys())) == ['KRB5CCNAME', 'PATH']
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
assert actual_env['PATH'] == os.environ['PATH']
assert mock_calls[0][2]['echo'] is False
assert mock_calls[1][0] == "().expect"
assert mock_calls[1][1] == (".*:",)
assert mock_calls[2][0] == "().sendline"
assert mock_calls[2][1] == ("pass",)
assert mock_calls[3][0] == "().read"
assert mock_calls[4][0] == "().wait"
def test_kinit_with_missing_executable_subprocess(self, monkeypatch):
expected_err = "[Errno 2] No such file or directory: " \
"'/fake/kinit': '/fake/kinit'"
mock_popen = MagicMock(side_effect=OSError(expected_err))
monkeypatch.setattr("subprocess.Popen", mock_popen)
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)
conn._build_winrm_kwargs()
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err
def test_kinit_with_missing_executable_pexpect(self, monkeypatch):
pexpect = pytest.importorskip("pexpect")
expected_err = "The command was not found or was not " \
"executable: /fake/kinit"
mock_pexpect = \
MagicMock(side_effect=pexpect.ExceptionPexpect(expected_err))
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
conn.set_options(var_options=options)
conn._build_winrm_kwargs()
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("user@domain", "pass")
assert str(err.value) == "Kerberos auth failure when calling " \
"kinit cmd '/fake/kinit': %s" % expected_err
def test_kinit_error_subprocess(self, monkeypatch):
expected_err = "kinit: krb5_parse_name: " \
"Configuration file does not specify default realm"
def mock_communicate(input=None, timeout=None):
return b"", to_bytes(expected_err)
mock_popen = MagicMock()
mock_popen.return_value.communicate = mock_communicate
mock_popen.return_value.returncode = 1
monkeypatch.setattr("subprocess.Popen", mock_popen)
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})
conn._build_winrm_kwargs()
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")
assert str(err.value) == \
"Kerberos auth failure for principal invaliduser with " \
"subprocess: %s" % (expected_err)
def test_kinit_error_pexpect(self, monkeypatch):
pytest.importorskip("pexpect")
expected_err = "Configuration file does not specify default realm"
mock_pexpect = MagicMock()
mock_pexpect.return_value.expect = MagicMock(side_effect=OSError)
mock_pexpect.return_value.read.return_value = to_bytes(expected_err)
mock_pexpect.return_value.exitstatus = 1
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
winrm.HAS_PEXPECT = True
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})
conn._build_winrm_kwargs()
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("invaliduser", "pass")
assert str(err.value) == \
"Kerberos auth failure for principal invaliduser with " \
"pexpect: %s" % (expected_err)
def test_kinit_error_pass_in_output_subprocess(self, monkeypatch):
def mock_communicate(input=None, timeout=None):
return b"", b"Error with kinit\n" + input
mock_popen = MagicMock()
mock_popen.return_value.communicate = mock_communicate
mock_popen.return_value.returncode = 1
monkeypatch.setattr("subprocess.Popen", mock_popen)
winrm.HAS_PEXPECT = False
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})
conn._build_winrm_kwargs()
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("username", "password")
assert str(err.value) == \
"Kerberos auth failure for principal username with subprocess: " \
"Error with kinit\n<redacted>"
def test_kinit_error_pass_in_output_pexpect(self, monkeypatch):
pytest.importorskip("pexpect")
mock_pexpect = MagicMock()
mock_pexpect.return_value.expect = MagicMock()
mock_pexpect.return_value.read.return_value = \
b"Error with kinit\npassword\n"
mock_pexpect.return_value.exitstatus = 1
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
winrm.HAS_PEXPECT = True
pc = PlayContext()
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})
conn._build_winrm_kwargs()
with pytest.raises(AnsibleConnectionFailure) as err:
conn._kerb_auth("username", "password")
assert str(err.value) == \
"Kerberos auth failure for principal username with pexpect: " \
"Error with kinit\n<redacted>"
def test_exec_command_with_timeout(self, monkeypatch):
requests_exc = pytest.importorskip("requests.exceptions")
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
mock_proto = MagicMock()
mock_proto.run_command.side_effect = requests_exc.Timeout("msg")
conn._connected = True
conn._winrm_host = 'hostname'
monkeypatch.setattr(conn, "_winrm_connect", lambda: mock_proto)
with pytest.raises(AnsibleConnectionFailure) as e:
conn.exec_command('cmd', in_data=None, sudoable=True)
assert str(e.value) == "winrm connection error: msg"
def test_exec_command_get_output_timeout(self, monkeypatch):
requests_exc = pytest.importorskip("requests.exceptions")
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
mock_proto = MagicMock()
mock_proto.run_command.return_value = "command_id"
mock_proto.send_message.side_effect = requests_exc.Timeout("msg")
conn._connected = True
conn._winrm_host = 'hostname'
monkeypatch.setattr(conn, "_winrm_connect", lambda: mock_proto)
with pytest.raises(AnsibleConnectionFailure) as e:
conn.exec_command('cmd', in_data=None, sudoable=True)
assert str(e.value) == "winrm connection error: msg"
def test_connect_failure_auth_401(self, monkeypatch):
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"ansible_winrm_transport": "basic", "_extras": {}})
mock_proto = MagicMock()
mock_proto.open_shell.side_effect = ValueError("Custom exc Code 401")
mock_proto_init = MagicMock()
mock_proto_init.return_value = mock_proto
monkeypatch.setattr(winrm, "Protocol", mock_proto_init)
with pytest.raises(AnsibleConnectionFailure, match="the specified credentials were rejected by the server"):
conn.exec_command('cmd', in_data=None, sudoable=True)
def test_connect_failure_other_exception(self, monkeypatch):
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"ansible_winrm_transport": "basic", "_extras": {}})
mock_proto = MagicMock()
mock_proto.open_shell.side_effect = ValueError("Custom exc")
mock_proto_init = MagicMock()
mock_proto_init.return_value = mock_proto
monkeypatch.setattr(winrm, "Protocol", mock_proto_init)
with pytest.raises(AnsibleConnectionFailure, match="basic: Custom exc"):
conn.exec_command('cmd', in_data=None, sudoable=True)
def test_connect_failure_operation_timed_out(self, monkeypatch):
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"ansible_winrm_transport": "basic", "_extras": {}})
mock_proto = MagicMock()
mock_proto.open_shell.side_effect = ValueError("Custom exc Operation timed out")
mock_proto_init = MagicMock()
mock_proto_init.return_value = mock_proto
monkeypatch.setattr(winrm, "Protocol", mock_proto_init)
with pytest.raises(AnsibleError, match="the connection attempt timed out"):
conn.exec_command('cmd', in_data=None, sudoable=True)
def test_connect_no_transport(self):
pc = PlayContext()
new_stdin = StringIO()
conn = connection_loader.get('winrm', pc, new_stdin)
conn.set_options(var_options={"_extras": {}})
conn._build_winrm_kwargs()
conn._winrm_transport = []
with pytest.raises(AnsibleError, match="No transport found for WinRM connection"):
conn._winrm_connect()