@ -13,6 +13,8 @@ import pytest
from io import StringIO
from ansible . compat . tests . mock import patch , MagicMock
from ansible . errors import AnsibleConnectionFailure
from ansible . module_utils . _text import to_bytes
from ansible . playbook . play_context import PlayContext
from ansible . plugins . loader import connection_loader
@ -219,3 +221,148 @@ class TestConnectionWinRM(object):
% ( attr , actual , expected )
module_patcher . stop ( )
class TestWinRMKerbAuth ( object ) :
DATA = (
# default
( { " _extras " : { } } , ( [ " kinit " , " user@domain " ] , ) , False ) ,
( { " _extras " : { } } , ( " kinit " , [ " user@domain " ] , ) , True ) ,
# override kinit path from options
( { " _extras " : { } , ' ansible_winrm_kinit_cmd ' : ' kinit2 ' } ,
( [ " kinit2 " , " user@domain " ] , ) , False ) ,
( { " _extras " : { } , ' ansible_winrm_kinit_cmd ' : ' kinit2 ' } ,
( " kinit2 " , [ " user@domain " ] , ) , True ) ,
# we expect the -f flag when delegation is set
( { " _extras " : { ' ansible_winrm_kerberos_delegation ' : True } } ,
( [ " kinit " , " -f " , " user@domain " ] , ) , False ) ,
( { " _extras " : { ' ansible_winrm_kerberos_delegation ' : True } } ,
( " kinit " , [ " -f " , " user@domain " ] , ) , True ) ,
)
# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize ( ' options, expected, pexpect ' ,
( ( o , e , p ) for o , e , p in DATA ) )
def test_kinit_success ( self , options , expected , pexpect ) :
def mock_popen_communicate ( input = None , timeout = None ) :
return b " " , b " "
mock_pexpect = None
if pexpect :
mock_pexpect = MagicMock ( )
mock_pexpect . spawn . return_value . exitstatus = 0
mock_subprocess = MagicMock ( )
mock_subprocess . Popen . return_value . communicate = mock_popen_communicate
mock_subprocess . Popen . return_value . returncode = 0
modules = {
' pexpect ' : mock_pexpect ,
' subprocess ' : mock_subprocess ,
}
with patch . dict ( sys . modules , modules ) :
pc = PlayContext ( )
new_stdin = StringIO ( )
connection_loader . _module_cache = { }
conn = connection_loader . get ( ' winrm ' , pc , new_stdin )
conn . set_options ( var_options = options )
conn . _kerb_auth ( " user@domain " , " pass " )
if pexpect :
assert len ( mock_pexpect . method_calls ) == 1
assert mock_pexpect . method_calls [ 0 ] [ 1 ] == expected
actual_env = mock_pexpect . method_calls [ 0 ] [ 2 ] [ ' env ' ]
else :
assert len ( mock_subprocess . method_calls ) == 1
assert mock_subprocess . method_calls [ 0 ] [ 1 ] == expected
actual_env = mock_subprocess . method_calls [ 0 ] [ 2 ] [ ' env ' ]
assert list ( actual_env . keys ( ) ) == [ ' KRB5CCNAME ' ]
assert actual_env [ ' KRB5CCNAME ' ] . startswith ( " FILE:/ " )
# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize ( ' use_pexpect ' , ( False , True ) , )
def test_kinit_with_missing_executable ( self , use_pexpect ) :
expected_err = " [Errno 2] No such file or directory: " \
" ' /fake/kinit ' : ' /fake/kinit ' "
mock_subprocess = MagicMock ( )
mock_subprocess . Popen = MagicMock ( side_effect = OSError ( expected_err ) )
mock_pexpect = None
if use_pexpect :
expected_err = " The command was not found or was not " \
" executable: /fake/kinit "
mock_pexpect = MagicMock ( )
mock_pexpect . ExceptionPexpect = Exception
mock_pexpect . spawn = MagicMock ( side_effect = Exception ( expected_err ) )
modules = {
' pexpect ' : mock_pexpect ,
' subprocess ' : mock_subprocess ,
}
with patch . dict ( sys . modules , modules ) :
pc = PlayContext ( )
new_stdin = StringIO ( )
connection_loader . _module_cache = { }
conn = connection_loader . get ( ' winrm ' , pc , new_stdin )
options = { " _extras " : { } , " ansible_winrm_kinit_cmd " : " /fake/kinit " }
conn . set_options ( var_options = options )
with pytest . raises ( AnsibleConnectionFailure ) as err :
conn . _kerb_auth ( " user@domain " , " pass " )
assert str ( err . value ) == " Kerberos auth failure when calling " \
" kinit cmd ' /fake/kinit ' : %s " % expected_err
# pylint bug: https://github.com/PyCQA/pylint/issues/511
# pylint: disable=undefined-variable
@pytest.mark.parametrize ( ' use_pexpect ' , ( False , True ) , )
def test_kinit_error ( self , use_pexpect ) :
mechanism = " subprocess "
expected_err = " kinit: krb5_parse_name: " \
" Configuration file does not specify default realm "
def mock_popen_communicate ( input = None , timeout = None ) :
return b " " , to_bytes ( expected_err )
mock_subprocess = MagicMock ( )
mock_subprocess . Popen . return_value . communicate = mock_popen_communicate
mock_subprocess . Popen . return_value . returncode = 1
mock_pexpect = None
if use_pexpect :
mechanism = " pexpect "
expected_err = " Configuration file does not specify default realm "
mock_pexpect = MagicMock ( )
mock_pexpect . spawn . return_value . expect = MagicMock ( side_effect = OSError )
mock_pexpect . spawn . return_value . read . return_value = to_bytes ( expected_err )
mock_pexpect . spawn . return_value . exitstatus = 1
modules = {
' pexpect ' : mock_pexpect ,
' subprocess ' : mock_subprocess ,
}
with patch . dict ( sys . modules , modules ) :
pc = PlayContext ( )
new_stdin = StringIO ( )
connection_loader . _module_cache = { }
conn = connection_loader . get ( ' winrm ' , pc , new_stdin )
conn . set_options ( var_options = { " _extras " : { } } )
with pytest . raises ( AnsibleConnectionFailure ) as err :
conn . _kerb_auth ( " invaliduser " , " pass " )
assert str ( err . value ) == " Kerberos auth failure for principal " \
" invaliduser with %s : %s " % ( mechanism , expected_err )