@ -21,6 +21,7 @@ from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
#!/usr/bin/env python
import sys
import getpass
import os
import shutil
@ -32,6 +33,7 @@ from nose.plugins.skip import SkipTest
from ansible . compat . tests import unittest
from ansible . compat . tests . mock import patch
from ansible . utils . unicode import to_bytes , to_unicode
from ansible import errors
from ansible . parsing . vault import VaultLib
@ -88,12 +90,12 @@ class TestVaultEditor(unittest.TestCase):
' read_data ' ,
' write_data ' ,
' shuffle_files ' ]
for slot in slots :
for slot in slots :
assert hasattr ( v , slot ) , " VaultLib is missing the %s method " % slot
@patch.object ( VaultEditor , ' _editor_shell_command ' )
def test_create_file ( self , mock_editor_shell_command ) :
def sc_side_effect ( filename ) :
return [ ' touch ' , filename ]
mock_editor_shell_command . side_effect = sc_side_effect
@ -107,12 +109,16 @@ class TestVaultEditor(unittest.TestCase):
self . assertTrue ( os . path . exists ( tmp_file . name ) )
def test_decrypt_1_0 ( self ) :
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 :
"""
Skip testing decrypting 1.0 files if we don ' t have access to AES, KDF or
Counter , or we are running on python3 since VaultAES hasn ' t been backported.
"""
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 or sys . version > ' 3 ' :
raise SkipTest
v10_file = tempfile . NamedTemporaryFile ( delete = False )
with v10_file as f :
f . write ( v10_data)
f . write ( to_bytes( v10_data) )
ve = VaultEditor ( None , " ansible " , v10_file . name )
@ -125,13 +131,13 @@ class TestVaultEditor(unittest.TestCase):
# verify decrypted content
f = open ( v10_file . name , " rb " )
fdata = f. read ( )
f . clo s e( )
fdata = to_unicode( f. read ( ) )
f . clo es ( )
os . unlink ( v10_file . name )
assert error_hit == False , " error decrypting 1.0 file "
assert fdata . strip ( ) == " foo " , " incorrect decryption of 1.0 file: %s " % fdata . strip ( )
assert error_hit == False , " error decrypting 1.0 file "
assert fdata . strip ( ) == " foo " , " incorrect decryption of 1.0 file: %s " % fdata . strip ( )
def test_decrypt_1_1 ( self ) :
@ -140,7 +146,7 @@ class TestVaultEditor(unittest.TestCase):
v11_file = tempfile . NamedTemporaryFile ( delete = False )
with v11_file as f :
f . write ( v11_data)
f . write ( to_bytes( v11_data) )
ve = VaultEditor ( None , " ansible " , v11_file . name )
@ -153,28 +159,32 @@ class TestVaultEditor(unittest.TestCase):
# verify decrypted content
f = open ( v11_file . name , " rb " )
fdata = f. read ( )
fdata = to_unicode( f. read ( ) )
f . close ( )
os . unlink ( v11_file . name )
assert error_hit == False , " error decrypting 1.0 file "
assert fdata . strip ( ) == " foo " , " incorrect decryption of 1.0 file: %s " % fdata . strip ( )
assert error_hit == False , " error decrypting 1.0 file "
assert fdata . strip ( ) == " foo " , " incorrect decryption of 1.0 file: %s " % fdata . strip ( )
def test_rekey_migration ( self ) :
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 :
"""
Skip testing rekeying files if we don ' t have access to AES, KDF or
Counter , or we are running on python3 since VaultAES hasn ' t been backported.
"""
if not HAS_AES or not HAS_COUNTER or not HAS_PBKDF2 or sys . version > ' 3 ' :
raise SkipTest
v10_file = tempfile . NamedTemporaryFile ( delete = False )
with v10_file as f :
f . write ( v10_data)
f . write ( to_bytes( v10_data) )
ve = VaultEditor ( None , " ansible " , v10_file . name )
# make sure the password functions for the cipher
error_hit = False
try :
try :
ve . rekey_file ( ' ansible2 ' )
except errors . AnsibleError as e :
error_hit = True
@ -184,7 +194,7 @@ class TestVaultEditor(unittest.TestCase):
fdata = f . read ( )
f . close ( )
assert error_hit == False , " error rekeying 1.0 file to 1.1 "
assert error_hit == False , " error rekeying 1.0 file to 1.1 "
# ensure filedata can be decrypted, is 1.1 and is AES256
vl = VaultLib ( " ansible2 " )
@ -198,7 +208,7 @@ class TestVaultEditor(unittest.TestCase):
os . unlink ( v10_file . name )
assert vl . cipher_name == " AES256 " , " wrong cipher name set after rekey: %s " % vl . cipher_name
assert error_hit == False , " error decrypting migrated 1.0 file "
assert error_hit == False , " error decrypting migrated 1.0 file "
assert dec_data . strip ( ) == " foo " , " incorrect decryption of rekeyed/migrated file: %s " % dec_data