@ -29,6 +29,7 @@ from ansible.compat.tests.mock import mock_open, patch
from ansible . errors import AnsibleError
from ansible . module_utils . six import text_type
from ansible . module_utils . six . moves import builtins
from ansible . module_utils . _text import to_bytes
from ansible . plugins . loader import PluginLoader
from ansible . plugins . lookup import password
from ansible . utils import encrypt
@ -372,6 +373,14 @@ class TestLookupModule(unittest.TestCase):
self . fake_loader = DictDataLoader ( { ' /path/to/somewhere ' : ' sdfsdf ' } )
self . password_lookup = password . LookupModule ( loader = self . fake_loader )
self . os_path_exists = password . os . path . exists
self . os_open = password . os . open
password . os . open = lambda path , flag : None
self . os_close = password . os . close
password . os . close = lambda fd : None
self . os_remove = password . os . remove
password . os . remove = lambda path : None
self . makedirs_safe = password . makedirs_safe
password . makedirs_safe = lambda path , mode : None
# Different releases of passlib default to a different number of rounds
self . sha256 = passlib . registry . get_crypt_handler ( ' pbkdf2_sha256 ' )
@ -380,6 +389,10 @@ class TestLookupModule(unittest.TestCase):
def tearDown ( self ) :
password . os . path . exists = self . os_path_exists
password . os . open = self . os_open
password . os . close = self . os_close
password . os . remove = self . os_remove
password . makedirs_safe = self . makedirs_safe
passlib . registry . register_crypt_handler ( self . sha256 , force = True )
@patch.object ( PluginLoader , ' _get_paths ' )
@ -425,7 +438,7 @@ class TestLookupModule(unittest.TestCase):
@patch ( ' ansible.plugins.lookup.password._write_password_file ' )
def test_password_already_created_encrypt ( self , mock_get_paths , mock_write_file ) :
mock_get_paths . return_value = [ ' /path/one ' , ' /path/two ' , ' /path/three ' ]
password . os . path . exists = lambda x : True
password . os . path . exists = lambda x : x == to_bytes ( ' /path/to/somewhere ' )
with patch . object ( builtins , ' open ' , mock_open ( read_data = b ' hunter42 salt=87654321 \n ' ) ) as m :
results = self . password_lookup . run ( [ u ' /path/to/somewhere chars=anything encrypt=pbkdf2_sha256 ' ] , None )
@ -436,7 +449,7 @@ class TestLookupModule(unittest.TestCase):
@patch ( ' ansible.plugins.lookup.password._write_password_file ' )
def test_password_already_created_no_encrypt ( self , mock_get_paths , mock_write_file ) :
mock_get_paths . return_value = [ ' /path/one ' , ' /path/two ' , ' /path/three ' ]
password . os . path . exists = lambda x : True
password . os . path . exists = lambda x : x == to_bytes ( ' /path/to/somewhere ' )
with patch . object ( builtins , ' open ' , mock_open ( read_data = b ' hunter42 salt=87654321 \n ' ) ) as m :
results = self . password_lookup . run ( [ u ' /path/to/somewhere chars=anything ' ] , None )
@ -452,3 +465,27 @@ class TestLookupModule(unittest.TestCase):
results = self . password_lookup . run ( [ u ' /path/to/somewhere chars=a ' ] , None )
for result in results :
self . assertEquals ( result , u ' a ' * password . DEFAULT_LENGTH )
def test_lock_been_held ( self ) :
# pretend the lock file is here
password . os . path . exists = lambda x : True
try :
with patch . object ( builtins , ' open ' , mock_open ( read_data = b ' hunter42 salt=87654321 \n ' ) ) as m :
# should timeout here
results = self . password_lookup . run ( [ u ' /path/to/somewhere chars=anything ' ] , None )
self . fail ( " Lookup didn ' t timeout when lock already been held " )
except AnsibleError :
pass
def test_lock_not_been_held ( self ) :
# pretend now there is password file but no lock
password . os . path . exists = lambda x : x == to_bytes ( ' /path/to/somewhere ' )
try :
with patch . object ( builtins , ' open ' , mock_open ( read_data = b ' hunter42 salt=87654321 \n ' ) ) as m :
# should not timeout here
results = self . password_lookup . run ( [ u ' /path/to/somewhere chars=anything ' ] , None )
except AnsibleError :
self . fail ( ' Lookup timeouts when lock is free ' )
for result in results :
self . assertEqual ( result , u ' hunter42 ' )