|
|
|
@ -33,7 +33,6 @@ from ansible import errors
|
|
|
|
|
from ansible.parsing import vault
|
|
|
|
|
from ansible.parsing.vault import VaultLib, VaultEditor, match_encrypt_secret
|
|
|
|
|
|
|
|
|
|
from ansible.module_utils.six import PY3
|
|
|
|
|
from ansible.module_utils.common.text.converters import to_bytes, to_text
|
|
|
|
|
|
|
|
|
|
from units.mock.vault_helper import TextVaultSecret
|
|
|
|
@ -88,11 +87,10 @@ class TestVaultEditor(unittest.TestCase):
|
|
|
|
|
suffix = '_ansible_unit_test_%s_' % (self.__class__.__name__)
|
|
|
|
|
return tempfile.mkdtemp(suffix=suffix)
|
|
|
|
|
|
|
|
|
|
def _create_file(self, test_dir, name, content=None, symlink=False):
|
|
|
|
|
def _create_file(self, test_dir, name, content, symlink=False):
|
|
|
|
|
file_path = os.path.join(test_dir, name)
|
|
|
|
|
with open(file_path, 'wb') as opened_file:
|
|
|
|
|
if content:
|
|
|
|
|
opened_file.write(content)
|
|
|
|
|
opened_file.write(content)
|
|
|
|
|
return file_path
|
|
|
|
|
|
|
|
|
|
def _vault_editor(self, vault_secrets=None):
|
|
|
|
@ -117,11 +115,8 @@ class TestVaultEditor(unittest.TestCase):
|
|
|
|
|
def test_stdin_binary(self):
|
|
|
|
|
stdin_data = '\0'
|
|
|
|
|
|
|
|
|
|
if PY3:
|
|
|
|
|
fake_stream = StringIO(stdin_data)
|
|
|
|
|
fake_stream.buffer = BytesIO(to_bytes(stdin_data))
|
|
|
|
|
else:
|
|
|
|
|
fake_stream = BytesIO(to_bytes(stdin_data))
|
|
|
|
|
fake_stream = StringIO(stdin_data)
|
|
|
|
|
fake_stream.buffer = BytesIO(to_bytes(stdin_data))
|
|
|
|
|
|
|
|
|
|
with patch('sys.stdin', fake_stream):
|
|
|
|
|
ve = self._vault_editor()
|
|
|
|
@ -166,7 +161,7 @@ class TestVaultEditor(unittest.TestCase):
|
|
|
|
|
self.assertNotEqual(src_file_contents, b_ciphertext,
|
|
|
|
|
'b_ciphertext should be encrypted and not equal to src_contents')
|
|
|
|
|
|
|
|
|
|
def _faux_editor(self, editor_args, new_src_contents=None):
|
|
|
|
|
def _faux_editor(self, editor_args, new_src_contents):
|
|
|
|
|
if editor_args[0] == 'shred':
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
@ -174,8 +169,7 @@ class TestVaultEditor(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
# simulate the tmp file being editted
|
|
|
|
|
with open(tmp_path, 'wb') as tmp_file:
|
|
|
|
|
if new_src_contents:
|
|
|
|
|
tmp_file.write(new_src_contents)
|
|
|
|
|
tmp_file.write(new_src_contents)
|
|
|
|
|
|
|
|
|
|
def _faux_command(self, tmp_path):
|
|
|
|
|
pass
|
|
|
|
@ -416,13 +410,6 @@ class TestVaultEditor(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
src_file_path = self._create_file(self._test_dir, 'src_file', content=src_contents)
|
|
|
|
|
|
|
|
|
|
new_src_contents = to_bytes("The info is different now.")
|
|
|
|
|
|
|
|
|
|
def faux_editor(editor_args):
|
|
|
|
|
self._faux_editor(editor_args, new_src_contents)
|
|
|
|
|
|
|
|
|
|
mock_sp_call.side_effect = faux_editor
|
|
|
|
|
|
|
|
|
|
ve = self._vault_editor()
|
|
|
|
|
self.assertRaisesRegex(errors.AnsibleError,
|
|
|
|
|
'input is not vault encrypted data',
|
|
|
|
@ -476,11 +463,7 @@ class TestVaultEditor(unittest.TestCase):
|
|
|
|
|
ve = self._vault_editor(self._secrets("ansible"))
|
|
|
|
|
|
|
|
|
|
# make sure the password functions for the cipher
|
|
|
|
|
error_hit = False
|
|
|
|
|
try:
|
|
|
|
|
ve.decrypt_file(v11_file.name)
|
|
|
|
|
except errors.AnsibleError:
|
|
|
|
|
error_hit = True
|
|
|
|
|
ve.decrypt_file(v11_file.name)
|
|
|
|
|
|
|
|
|
|
# verify decrypted content
|
|
|
|
|
with open(v11_file.name, "rb") as f:
|
|
|
|
@ -488,7 +471,6 @@ class TestVaultEditor(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
os.unlink(v11_file.name)
|
|
|
|
|
|
|
|
|
|
assert error_hit is False, "error decrypting 1.1 file"
|
|
|
|
|
assert fdata.strip() == "foo", "incorrect decryption of 1.1 file: %s" % fdata.strip()
|
|
|
|
|
|
|
|
|
|
def test_real_path_dash(self):
|
|
|
|
|