@ -20,7 +20,6 @@
from __future__ import ( absolute_import , division , print_function )
from __future__ import ( absolute_import , division , print_function )
__metaclass__ = type
__metaclass__ = type
import sys
import os
import os
import tempfile
import tempfile
from nose . plugins . skip import SkipTest
from nose . plugins . skip import SkipTest
@ -29,8 +28,7 @@ from ansible.compat.tests import unittest
from ansible . compat . tests . mock import patch
from ansible . compat . tests . mock import patch
from ansible import errors
from ansible import errors
from ansible . parsing . vault import VaultLib
from ansible . parsing import vault
from ansible . parsing . vault import VaultEditor
from ansible . module_utils . _text import to_bytes , to_text
from ansible . module_utils . _text import to_bytes , to_text
@ -71,25 +69,344 @@ v11_data = """$ANSIBLE_VAULT;1.1;AES256
class TestVaultEditor ( unittest . TestCase ) :
class TestVaultEditor ( unittest . TestCase ) :
def setUp ( self ) :
def setUp ( self ) :
pass
self . _test_dir = None
def tearDown ( self ) :
def tearDown ( self ) :
pass
if self . _test_dir :
pass
#shutil.rmtree(self._test_dir)
self . _test_dir = None
def test_methods_exist ( self ) :
def test_methods_exist ( self ) :
v = VaultEditor ( None )
v = vault. VaultEditor( None )
slots = [ ' create_file ' ,
slots = [ ' create_file ' ,
' decrypt_file ' ,
' decrypt_file ' ,
' edit_file ' ,
' edit_file ' ,
' encrypt_file ' ,
' encrypt_file ' ,
' rekey_file ' ,
' rekey_file ' ,
' read_data ' ,
' read_data ' ,
' write_data ' ,
' write_data ' ]
' shuffle_files ' ]
for slot in slots :
for slot in slots :
assert hasattr ( v , slot ) , " VaultLib is missing the %s method " % slot
assert hasattr ( v , slot ) , " VaultLib is missing the %s method " % slot
@patch.object ( VaultEditor , ' _editor_shell_command ' )
def _create_test_dir ( self ) :
suffix = ' _ansible_unit_test_ %s _ ' % ( self . __class__ . __name__ )
return tempfile . mkdtemp ( suffix = suffix )
def _create_file ( self , test_dir , name , content = None , symlink = False ) :
file_path = os . path . join ( test_dir , name )
opened_file = open ( file_path , ' wb ' )
if content :
opened_file . write ( content )
opened_file . close ( )
return file_path
@patch ( ' ansible.parsing.vault.call ' )
def test_edit_file_helper_empty_target ( self , mock_sp_call ) :
self . _test_dir = self . _create_test_dir ( )
src_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_contents )
mock_sp_call . side_effect = self . _faux_command
ve = vault . VaultEditor ( ' password ' )
b_ciphertext = ve . _edit_file_helper ( src_file_path )
self . assertNotEqual ( src_contents , b_ciphertext )
@patch ( ' ansible.parsing.vault.call ' )
def test_edit_file_helper_call_exception ( self , mock_sp_call ) :
self . _test_dir = self . _create_test_dir ( )
src_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_contents )
error_txt = ' calling editor raised an exception '
mock_sp_call . side_effect = errors . AnsibleError ( error_txt )
ve = vault . VaultEditor ( ' password ' )
self . assertRaisesRegexp ( errors . AnsibleError ,
error_txt ,
ve . _edit_file_helper ,
src_file_path )
@patch ( ' ansible.parsing.vault.call ' )
def test_edit_file_helper_symlink_target ( self , mock_sp_call ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
src_file_link_path = os . path . join ( self . _test_dir , ' a_link_to_dest_file ' )
os . symlink ( src_file_path , src_file_link_path )
mock_sp_call . side_effect = self . _faux_command
ve = vault . VaultEditor ( ' password ' )
b_ciphertext = ve . _edit_file_helper ( src_file_link_path )
self . assertNotEqual ( src_file_contents , b_ciphertext ,
' b_ciphertext should be encrypted and not equal to src_contents ' )
def _faux_editor ( self , editor_args , new_src_contents = None ) :
if editor_args [ 0 ] == ' shred ' :
return
tmp_path = editor_args [ - 1 ]
# simulate the tmp file being editted
tmp_file = open ( tmp_path , ' wb ' )
if new_src_contents :
tmp_file . write ( new_src_contents )
tmp_file . close ( )
def _faux_command ( self , tmp_path ) :
pass
@patch ( ' ansible.parsing.vault.call ' )
def test_edit_file_helper_no_change ( self , mock_sp_call ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
# editor invocation doesnt change anything
def faux_editor ( editor_args ) :
self . _faux_editor ( editor_args , src_file_contents )
mock_sp_call . side_effect = faux_editor
ve = vault . VaultEditor ( ' password ' )
ve . _edit_file_helper ( src_file_path , existing_data = src_file_contents )
new_target_file = open ( src_file_path , ' rb ' )
new_target_file_contents = new_target_file . read ( )
self . assertEqual ( src_file_contents , new_target_file_contents )
def _assert_file_is_encrypted ( self , vault_editor , src_file_path , src_contents ) :
new_src_file = open ( src_file_path , ' rb ' )
new_src_file_contents = new_src_file . read ( )
# TODO: assert that it is encrypted
self . assertTrue ( vault . is_encrypted ( new_src_file_contents ) )
src_file_plaintext = vault_editor . vault . decrypt ( new_src_file_contents )
# the plaintext should not be encrypted
self . assertFalse ( vault . is_encrypted ( src_file_plaintext ) )
# and the new plaintext should match the original
self . assertEqual ( src_file_plaintext , src_contents )
def _assert_file_is_link ( self , src_file_link_path , src_file_path ) :
self . assertTrue ( os . path . islink ( src_file_link_path ) ,
' The dest path ( %s ) should be a symlink to ( %s ) but is not ' % ( src_file_link_path , src_file_path ) )
def test_rekey_file ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
ve = vault . VaultEditor ( ' password ' )
ve . encrypt_file ( src_file_path )
new_password = ' password2:electricbugaloo '
ve . rekey_file ( src_file_path , new_password )
new_ve = vault . VaultEditor ( new_password )
self . _assert_file_is_encrypted ( new_ve , src_file_path , src_file_contents )
def test_rekey_file_no_new_password ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
ve = vault . VaultEditor ( ' password ' )
ve . encrypt_file ( src_file_path )
self . assertRaisesRegexp ( errors . AnsibleError ,
' The value for the new_password to rekey ' ,
ve . rekey_file ,
src_file_path ,
None )
def test_rekey_file_not_encrypted ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
ve = vault . VaultEditor ( ' password ' )
new_password = ' password2:electricbugaloo '
self . assertRaisesRegexp ( errors . AnsibleError ,
' input is not vault encrypted data ' ,
ve . rekey_file ,
src_file_path , new_password )
def test_plaintext ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
ve = vault . VaultEditor ( ' password ' )
ve . encrypt_file ( src_file_path )
res = ve . plaintext ( src_file_path )
self . assertEquals ( src_file_contents , res )
def test_plaintext_not_encrypted ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
ve = vault . VaultEditor ( ' password ' )
self . assertRaisesRegexp ( errors . AnsibleError ,
' input is not vault encrypted data ' ,
ve . plaintext ,
src_file_path )
def test_encrypt_file ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
ve = vault . VaultEditor ( ' password ' )
ve . encrypt_file ( src_file_path )
self . _assert_file_is_encrypted ( ve , src_file_path , src_file_contents )
def test_encrypt_file_symlink ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_file_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_file_contents )
src_file_link_path = os . path . join ( self . _test_dir , ' a_link_to_dest_file ' )
os . symlink ( src_file_path , src_file_link_path )
ve = vault . VaultEditor ( ' password ' )
ve . encrypt_file ( src_file_link_path )
self . _assert_file_is_encrypted ( ve , src_file_path , src_file_contents )
self . _assert_file_is_encrypted ( ve , src_file_link_path , src_file_contents )
self . _assert_file_is_link ( src_file_link_path , src_file_path )
@patch ( ' ansible.parsing.vault.call ' )
def test_edit_file ( self , mock_sp_call ) :
self . _test_dir = self . _create_test_dir ( )
src_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_contents )
new_src_contents = to_bytes ( " The info is different now. " )
def faux_editor ( editor_args ) :
self . _faux_editor ( editor_args , new_src_contents )
mock_sp_call . side_effect = faux_editor
ve = vault . VaultEditor ( ' password ' )
ve . encrypt_file ( src_file_path )
ve . edit_file ( src_file_path )
new_src_file = open ( src_file_path , ' rb ' )
new_src_file_contents = new_src_file . read ( )
src_file_plaintext = ve . vault . decrypt ( new_src_file_contents )
self . assertEqual ( src_file_plaintext , new_src_contents )
new_stat = os . stat ( src_file_path )
print ( new_stat )
@patch ( ' ansible.parsing.vault.call ' )
def test_edit_file_symlink ( self , mock_sp_call ) :
self . _test_dir = self . _create_test_dir ( )
src_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_contents )
new_src_contents = to_bytes ( " The info is different now. " )
def faux_editor ( editor_args ) :
self . _faux_editor ( editor_args , new_src_contents )
mock_sp_call . side_effect = faux_editor
ve = vault . VaultEditor ( ' password ' )
ve . encrypt_file ( src_file_path )
src_file_link_path = os . path . join ( self . _test_dir , ' a_link_to_dest_file ' )
os . symlink ( src_file_path , src_file_link_path )
ve . edit_file ( src_file_link_path )
new_src_file = open ( src_file_path , ' rb ' )
new_src_file_contents = new_src_file . read ( )
src_file_plaintext = ve . vault . decrypt ( new_src_file_contents )
self . _assert_file_is_link ( src_file_link_path , src_file_path )
self . assertEqual ( src_file_plaintext , new_src_contents )
#self.assertEqual(src_file_plaintext, new_src_contents,
# 'The decrypted plaintext of the editted file is not the expected contents.')
@patch ( ' ansible.parsing.vault.call ' )
def test_edit_file_not_encrypted ( self , mock_sp_call ) :
self . _test_dir = self . _create_test_dir ( )
src_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_contents )
new_src_contents = to_bytes ( " The info is different now. " )
def faux_editor ( editor_args ) :
self . _faux_editor ( editor_args , new_src_contents )
mock_sp_call . side_effect = faux_editor
ve = vault . VaultEditor ( ' password ' )
self . assertRaisesRegexp ( errors . AnsibleError ,
' input is not vault encrypted data ' ,
ve . edit_file ,
src_file_path )
def test_create_file_exists ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_contents )
ve = vault . VaultEditor ( ' password ' )
self . assertRaisesRegexp ( errors . AnsibleError ,
' please use .edit. instead ' ,
ve . create_file ,
src_file_path )
def test_decrypt_file_exception ( self ) :
self . _test_dir = self . _create_test_dir ( )
src_contents = to_bytes ( " some info in a file \n yup. " )
src_file_path = self . _create_file ( self . _test_dir , ' src_file ' , content = src_contents )
ve = vault . VaultEditor ( ' password ' )
self . assertRaisesRegexp ( errors . AnsibleError ,
' input is not vault encrypted data ' ,
ve . decrypt_file ,
src_file_path )
@patch.object ( vault . VaultEditor , ' _editor_shell_command ' )
def test_create_file ( self , mock_editor_shell_command ) :
def test_create_file ( self , mock_editor_shell_command ) :
def sc_side_effect ( filename ) :
def sc_side_effect ( filename ) :
@ -99,7 +416,7 @@ class TestVaultEditor(unittest.TestCase):
tmp_file = tempfile . NamedTemporaryFile ( )
tmp_file = tempfile . NamedTemporaryFile ( )
os . unlink ( tmp_file . name )
os . unlink ( tmp_file . name )
ve = VaultEditor( " ansible " )
ve = vault. VaultEditor( " ansible " )
ve . create_file ( tmp_file . name )
ve . create_file ( tmp_file . name )
self . assertTrue ( os . path . exists ( tmp_file . name ) )
self . assertTrue ( os . path . exists ( tmp_file . name ) )
@ -113,7 +430,7 @@ class TestVaultEditor(unittest.TestCase):
with v10_file as f :
with v10_file as f :
f . write ( to_bytes ( v10_data ) )
f . write ( to_bytes ( v10_data ) )
ve = VaultEditor( " ansible " )
ve = vault. VaultEditor( " ansible " )
# make sure the password functions for the cipher
# make sure the password functions for the cipher
error_hit = False
error_hit = False
@ -130,6 +447,7 @@ class TestVaultEditor(unittest.TestCase):
os . unlink ( v10_file . name )
os . unlink ( v10_file . name )
assert error_hit is False , " error decrypting 1.0 file "
assert error_hit is False , " error decrypting 1.0 file "
self . assertEquals ( fdata . strip ( ) , " foo " )
assert fdata . strip ( ) == " foo " , " incorrect decryption of 1.0 file: %s " % fdata . strip ( )
assert fdata . strip ( ) == " foo " , " incorrect decryption of 1.0 file: %s " % fdata . strip ( )
def test_decrypt_1_1 ( self ) :
def test_decrypt_1_1 ( self ) :
@ -140,7 +458,7 @@ class TestVaultEditor(unittest.TestCase):
with v11_file as f :
with v11_file as f :
f . write ( to_bytes ( v11_data ) )
f . write ( to_bytes ( v11_data ) )
ve = VaultEditor( " ansible " )
ve = vault. VaultEditor( " ansible " )
# make sure the password functions for the cipher
# make sure the password functions for the cipher
error_hit = False
error_hit = False
@ -168,7 +486,7 @@ class TestVaultEditor(unittest.TestCase):
with v10_file as f :
with v10_file as f :
f . write ( to_bytes ( v10_data ) )
f . write ( to_bytes ( v10_data ) )
ve = VaultEditor( " ansible " )
ve = vault. VaultEditor( " ansible " )
# make sure the password functions for the cipher
# make sure the password functions for the cipher
error_hit = False
error_hit = False
@ -185,7 +503,7 @@ class TestVaultEditor(unittest.TestCase):
assert error_hit is False , " error rekeying 1.0 file to 1.1 "
assert error_hit is False , " error rekeying 1.0 file to 1.1 "
# ensure filedata can be decrypted, is 1.1 and is AES256
# ensure filedata can be decrypted, is 1.1 and is AES256
vl = VaultLib( " ansible2 " )
vl = vault. VaultLib( " ansible2 " )
dec_data = None
dec_data = None
error_hit = False
error_hit = False
try :
try :