mirror of https://github.com/ansible/ansible.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
425 lines
15 KiB
Python
425 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
# (c) 2018, Jordan Borean <jborean@redhat.com>
|
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
# Make coding more python3-ish
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
import pytest
|
|
|
|
from io import StringIO
|
|
|
|
from units.compat.mock import patch, MagicMock
|
|
from ansible.errors import AnsibleConnectionFailure
|
|
from ansible.module_utils._text import to_bytes
|
|
from ansible.playbook.play_context import PlayContext
|
|
from ansible.plugins.loader import connection_loader
|
|
from ansible.plugins.connection import winrm
|
|
|
|
pytest.importorskip("winrm")
|
|
|
|
|
|
class TestConnectionWinRM(object):
|
|
|
|
OPTIONS_DATA = (
|
|
# default options
|
|
(
|
|
{},
|
|
{'_extras': {}},
|
|
{},
|
|
{
|
|
'_kerb_managed': False,
|
|
'_kinit_cmd': 'kinit',
|
|
'_winrm_connection_timeout': None,
|
|
'_winrm_host': 'inventory_hostname',
|
|
'_winrm_kwargs': {'username': None, 'password': ''},
|
|
'_winrm_pass': '',
|
|
'_winrm_path': '/wsman',
|
|
'_winrm_port': 5986,
|
|
'_winrm_scheme': 'https',
|
|
'_winrm_transport': ['ssl'],
|
|
'_winrm_user': None
|
|
},
|
|
False
|
|
),
|
|
# http through port
|
|
(
|
|
{},
|
|
{'_extras': {}, 'ansible_port': 5985},
|
|
{},
|
|
{
|
|
'_winrm_kwargs': {'username': None, 'password': ''},
|
|
'_winrm_port': 5985,
|
|
'_winrm_scheme': 'http',
|
|
'_winrm_transport': ['plaintext'],
|
|
},
|
|
False
|
|
),
|
|
# kerberos user with kerb present
|
|
(
|
|
{},
|
|
{'_extras': {}, 'ansible_user': 'user@domain.com'},
|
|
{},
|
|
{
|
|
'_kerb_managed': False,
|
|
'_kinit_cmd': 'kinit',
|
|
'_winrm_kwargs': {'username': 'user@domain.com',
|
|
'password': ''},
|
|
'_winrm_pass': '',
|
|
'_winrm_transport': ['kerberos', 'ssl'],
|
|
'_winrm_user': 'user@domain.com'
|
|
},
|
|
True
|
|
),
|
|
# kerberos user without kerb present
|
|
(
|
|
{},
|
|
{'_extras': {}, 'ansible_user': 'user@domain.com'},
|
|
{},
|
|
{
|
|
'_kerb_managed': False,
|
|
'_kinit_cmd': 'kinit',
|
|
'_winrm_kwargs': {'username': 'user@domain.com',
|
|
'password': ''},
|
|
'_winrm_pass': '',
|
|
'_winrm_transport': ['ssl'],
|
|
'_winrm_user': 'user@domain.com'
|
|
},
|
|
False
|
|
),
|
|
# kerberos user with managed ticket (implicit)
|
|
(
|
|
{'password': 'pass'},
|
|
{'_extras': {}, 'ansible_user': 'user@domain.com'},
|
|
{},
|
|
{
|
|
'_kerb_managed': True,
|
|
'_kinit_cmd': 'kinit',
|
|
'_winrm_kwargs': {'username': 'user@domain.com',
|
|
'password': 'pass'},
|
|
'_winrm_pass': 'pass',
|
|
'_winrm_transport': ['kerberos', 'ssl'],
|
|
'_winrm_user': 'user@domain.com'
|
|
},
|
|
True
|
|
),
|
|
# kerb with managed ticket (explicit)
|
|
(
|
|
{'password': 'pass'},
|
|
{'_extras': {}, 'ansible_user': 'user@domain.com',
|
|
'ansible_winrm_kinit_mode': 'managed'},
|
|
{},
|
|
{
|
|
'_kerb_managed': True,
|
|
},
|
|
True
|
|
),
|
|
# kerb with unmanaged ticket (explicit))
|
|
(
|
|
{'password': 'pass'},
|
|
{'_extras': {}, 'ansible_user': 'user@domain.com',
|
|
'ansible_winrm_kinit_mode': 'manual'},
|
|
{},
|
|
{
|
|
'_kerb_managed': False,
|
|
},
|
|
True
|
|
),
|
|
# transport override (single)
|
|
(
|
|
{},
|
|
{'_extras': {}, 'ansible_user': 'user@domain.com',
|
|
'ansible_winrm_transport': 'ntlm'},
|
|
{},
|
|
{
|
|
'_winrm_kwargs': {'username': 'user@domain.com',
|
|
'password': ''},
|
|
'_winrm_pass': '',
|
|
'_winrm_transport': ['ntlm'],
|
|
},
|
|
False
|
|
),
|
|
# transport override (list)
|
|
(
|
|
{},
|
|
{'_extras': {}, 'ansible_user': 'user@domain.com',
|
|
'ansible_winrm_transport': ['ntlm', 'certificate']},
|
|
{},
|
|
{
|
|
'_winrm_kwargs': {'username': 'user@domain.com',
|
|
'password': ''},
|
|
'_winrm_pass': '',
|
|
'_winrm_transport': ['ntlm', 'certificate'],
|
|
},
|
|
False
|
|
),
|
|
# winrm extras
|
|
(
|
|
{},
|
|
{'_extras': {'ansible_winrm_server_cert_validation': 'ignore',
|
|
'ansible_winrm_service': 'WSMAN'}},
|
|
{},
|
|
{
|
|
'_winrm_kwargs': {'username': None, 'password': '',
|
|
'server_cert_validation': 'ignore',
|
|
'service': 'WSMAN'},
|
|
},
|
|
False
|
|
),
|
|
# direct override
|
|
(
|
|
{},
|
|
{'_extras': {}, 'ansible_winrm_connection_timeout': 5},
|
|
{'connection_timeout': 10},
|
|
{
|
|
'_winrm_connection_timeout': 10,
|
|
},
|
|
False
|
|
),
|
|
# user comes from option not play context
|
|
(
|
|
{'username': 'user1'},
|
|
{'_extras': {}, 'ansible_user': 'user2'},
|
|
{},
|
|
{
|
|
'_winrm_user': 'user2',
|
|
'_winrm_kwargs': {'username': 'user2', 'password': ''}
|
|
},
|
|
False
|
|
)
|
|
)
|
|
|
|
# pylint bug: https://github.com/PyCQA/pylint/issues/511
|
|
# pylint: disable=undefined-variable
|
|
@pytest.mark.parametrize('play, options, direct, expected, kerb',
|
|
((p, o, d, e, k) for p, o, d, e, k in OPTIONS_DATA))
|
|
def test_set_options(self, play, options, direct, expected, kerb):
|
|
winrm.HAVE_KERBEROS = kerb
|
|
|
|
pc = PlayContext()
|
|
for attr, value in play.items():
|
|
setattr(pc, attr, value)
|
|
new_stdin = StringIO()
|
|
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
conn.set_options(var_options=options, direct=direct)
|
|
conn._build_winrm_kwargs()
|
|
|
|
for attr, expected in expected.items():
|
|
actual = getattr(conn, attr)
|
|
assert actual == expected, \
|
|
"winrm attr '%s', actual '%s' != expected '%s'"\
|
|
% (attr, actual, expected)
|
|
|
|
|
|
class TestWinRMKerbAuth(object):
|
|
|
|
@pytest.mark.parametrize('options, expected', [
|
|
[{"_extras": {}},
|
|
(["kinit", "user@domain"],)],
|
|
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
|
|
(["kinit2", "user@domain"],)],
|
|
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
|
|
(["kinit", "-f", "user@domain"],)],
|
|
])
|
|
def test_kinit_success_subprocess(self, monkeypatch, options, expected):
|
|
def mock_communicate(input=None, timeout=None):
|
|
return b"", b""
|
|
|
|
mock_popen = MagicMock()
|
|
mock_popen.return_value.communicate = mock_communicate
|
|
mock_popen.return_value.returncode = 0
|
|
monkeypatch.setattr("subprocess.Popen", mock_popen)
|
|
|
|
winrm.HAS_PEXPECT = False
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
conn.set_options(var_options=options)
|
|
conn._build_winrm_kwargs()
|
|
|
|
conn._kerb_auth("user@domain", "pass")
|
|
mock_calls = mock_popen.mock_calls
|
|
assert len(mock_calls) == 1
|
|
assert mock_calls[0][1] == expected
|
|
actual_env = mock_calls[0][2]['env']
|
|
assert list(actual_env.keys()) == ['KRB5CCNAME']
|
|
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
|
|
|
|
@pytest.mark.parametrize('options, expected', [
|
|
[{"_extras": {}},
|
|
("kinit", ["user@domain"],)],
|
|
[{"_extras": {}, 'ansible_winrm_kinit_cmd': 'kinit2'},
|
|
("kinit2", ["user@domain"],)],
|
|
[{"_extras": {'ansible_winrm_kerberos_delegation': True}},
|
|
("kinit", ["-f", "user@domain"],)],
|
|
])
|
|
def test_kinit_success_pexpect(self, monkeypatch, options, expected):
|
|
pytest.importorskip("pexpect")
|
|
mock_pexpect = MagicMock()
|
|
mock_pexpect.return_value.exitstatus = 0
|
|
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
|
|
|
|
winrm.HAS_PEXPECT = True
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
conn.set_options(var_options=options)
|
|
conn._build_winrm_kwargs()
|
|
|
|
conn._kerb_auth("user@domain", "pass")
|
|
mock_calls = mock_pexpect.mock_calls
|
|
assert mock_calls[0][1] == expected
|
|
actual_env = mock_calls[0][2]['env']
|
|
assert list(actual_env.keys()) == ['KRB5CCNAME']
|
|
assert actual_env['KRB5CCNAME'].startswith("FILE:/")
|
|
assert mock_calls[0][2]['echo'] is False
|
|
assert mock_calls[1][0] == "().expect"
|
|
assert mock_calls[1][1] == (".*:",)
|
|
assert mock_calls[2][0] == "().sendline"
|
|
assert mock_calls[2][1] == ("pass",)
|
|
assert mock_calls[3][0] == "().read"
|
|
assert mock_calls[4][0] == "().wait"
|
|
|
|
def test_kinit_with_missing_executable_subprocess(self, monkeypatch):
|
|
expected_err = "[Errno 2] No such file or directory: " \
|
|
"'/fake/kinit': '/fake/kinit'"
|
|
mock_popen = MagicMock(side_effect=OSError(expected_err))
|
|
|
|
monkeypatch.setattr("subprocess.Popen", mock_popen)
|
|
|
|
winrm.HAS_PEXPECT = False
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
|
|
conn.set_options(var_options=options)
|
|
conn._build_winrm_kwargs()
|
|
|
|
with pytest.raises(AnsibleConnectionFailure) as err:
|
|
conn._kerb_auth("user@domain", "pass")
|
|
assert str(err.value) == "Kerberos auth failure when calling " \
|
|
"kinit cmd '/fake/kinit': %s" % expected_err
|
|
|
|
def test_kinit_with_missing_executable_pexpect(self, monkeypatch):
|
|
pexpect = pytest.importorskip("pexpect")
|
|
|
|
expected_err = "The command was not found or was not " \
|
|
"executable: /fake/kinit"
|
|
mock_pexpect = \
|
|
MagicMock(side_effect=pexpect.ExceptionPexpect(expected_err))
|
|
|
|
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
|
|
|
|
winrm.HAS_PEXPECT = True
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
options = {"_extras": {}, "ansible_winrm_kinit_cmd": "/fake/kinit"}
|
|
conn.set_options(var_options=options)
|
|
conn._build_winrm_kwargs()
|
|
|
|
with pytest.raises(AnsibleConnectionFailure) as err:
|
|
conn._kerb_auth("user@domain", "pass")
|
|
assert str(err.value) == "Kerberos auth failure when calling " \
|
|
"kinit cmd '/fake/kinit': %s" % expected_err
|
|
|
|
def test_kinit_error_subprocess(self, monkeypatch):
|
|
expected_err = "kinit: krb5_parse_name: " \
|
|
"Configuration file does not specify default realm"
|
|
|
|
def mock_communicate(input=None, timeout=None):
|
|
return b"", to_bytes(expected_err)
|
|
|
|
mock_popen = MagicMock()
|
|
mock_popen.return_value.communicate = mock_communicate
|
|
mock_popen.return_value.returncode = 1
|
|
monkeypatch.setattr("subprocess.Popen", mock_popen)
|
|
|
|
winrm.HAS_PEXPECT = False
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
conn.set_options(var_options={"_extras": {}})
|
|
conn._build_winrm_kwargs()
|
|
|
|
with pytest.raises(AnsibleConnectionFailure) as err:
|
|
conn._kerb_auth("invaliduser", "pass")
|
|
|
|
assert str(err.value) == \
|
|
"Kerberos auth failure for principal invaliduser with " \
|
|
"subprocess: %s" % (expected_err)
|
|
|
|
def test_kinit_error_pexpect(self, monkeypatch):
|
|
pytest.importorskip("pexpect")
|
|
|
|
expected_err = "Configuration file does not specify default realm"
|
|
mock_pexpect = MagicMock()
|
|
mock_pexpect.return_value.expect = MagicMock(side_effect=OSError)
|
|
mock_pexpect.return_value.read.return_value = to_bytes(expected_err)
|
|
mock_pexpect.return_value.exitstatus = 1
|
|
|
|
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
|
|
|
|
winrm.HAS_PEXPECT = True
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
conn.set_options(var_options={"_extras": {}})
|
|
conn._build_winrm_kwargs()
|
|
|
|
with pytest.raises(AnsibleConnectionFailure) as err:
|
|
conn._kerb_auth("invaliduser", "pass")
|
|
|
|
assert str(err.value) == \
|
|
"Kerberos auth failure for principal invaliduser with " \
|
|
"pexpect: %s" % (expected_err)
|
|
|
|
def test_kinit_error_pass_in_output_subprocess(self, monkeypatch):
|
|
def mock_communicate(input=None, timeout=None):
|
|
return b"", b"Error with kinit\n" + input
|
|
|
|
mock_popen = MagicMock()
|
|
mock_popen.return_value.communicate = mock_communicate
|
|
mock_popen.return_value.returncode = 1
|
|
monkeypatch.setattr("subprocess.Popen", mock_popen)
|
|
|
|
winrm.HAS_PEXPECT = False
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
conn.set_options(var_options={"_extras": {}})
|
|
conn._build_winrm_kwargs()
|
|
|
|
with pytest.raises(AnsibleConnectionFailure) as err:
|
|
conn._kerb_auth("username", "password")
|
|
assert str(err.value) == \
|
|
"Kerberos auth failure for principal username with subprocess: " \
|
|
"Error with kinit\n<redacted>"
|
|
|
|
def test_kinit_error_pass_in_output_pexpect(self, monkeypatch):
|
|
pytest.importorskip("pexpect")
|
|
|
|
mock_pexpect = MagicMock()
|
|
mock_pexpect.return_value.expect = MagicMock()
|
|
mock_pexpect.return_value.read.return_value = \
|
|
b"Error with kinit\npassword\n"
|
|
mock_pexpect.return_value.exitstatus = 1
|
|
|
|
monkeypatch.setattr("pexpect.spawn", mock_pexpect)
|
|
|
|
winrm.HAS_PEXPECT = True
|
|
pc = PlayContext()
|
|
pc = PlayContext()
|
|
new_stdin = StringIO()
|
|
conn = connection_loader.get('winrm', pc, new_stdin)
|
|
conn.set_options(var_options={"_extras": {}})
|
|
conn._build_winrm_kwargs()
|
|
|
|
with pytest.raises(AnsibleConnectionFailure) as err:
|
|
conn._kerb_auth("username", "password")
|
|
assert str(err.value) == \
|
|
"Kerberos auth failure for principal username with pexpect: " \
|
|
"Error with kinit\n<redacted>"
|