@ -32,353 +32,224 @@ from ansible import constants as C
from Crypto.Cipher import AES as AES_
from Crypto.Cipher import AES as AES
HAS_AES = True
HAS_AES = True
except ImportError:
except ImportError:
HAS_AES = False
HAS_AES = False
def is_encrypted(filename):
class VaultLib(object):
def __init__(self, password):
Check a file for the encrypted header and return True or False
self.password = password
self.cipher_name = None
The first line should start with the header
self.version = '1.0'
defined by the global HEADER. If true, we
assume this is a properly encrypted file.
# read first line of the file
with open(filename) as f:
head = f.next()
except StopIteration:
# empty file, so not encrypted
return False
if head.startswith(HEADER):
def is_encrypted(self, data):
if data.startswith(HEADER):
return True
return True
return False
return False
def decrypt(filename, password):
def encrypt(self, data):
if self.is_encrypted(data):
Return a decrypted string of the contents in an encrypted file
raise errors.AnsibleError("data is already encrypted")
This is used by the yaml loading code in ansible
if 'Vault' + self.cipher_name in globals() and self.cipher_name in CIPHER_WHITELIST:
to automatically determine the encryption type
cipher = globals()['Vault' + self.cipher_name]
and return a plaintext string of the unencrypted
this_cipher = cipher()
raise errors.AnsibleError("%s cipher could not be found" % self.cipher_name)
if password is None:
# combine sha + data
raise errors.AnsibleError("A vault password must be specified to decrypt %s" % filename)
this_sha = sha256(data).hexdigest()
tmp_data = this_sha + "\n" + data
# encrypt sha + data
tmp_data = this_cipher.encrypt(tmp_data, self.password)
# add header
tmp_data = self._add_headers_and_hexify_encrypted_data(tmp_data)
return tmp_data
def decrypt(self, data):
if self.password is None:
raise errors.AnsibleError("A vault password must be specified to decrypt data")
if not self.is_encrypted(data):
raise errors.AnsibleError("data is not encrypted")
# clean out header, hex and sha
data = self._split_headers_and_get_unhexified_data(data)
# create the cipher object
if 'Vault' + self.cipher_name in globals() and self.cipher_name in CIPHER_WHITELIST:
cipher = globals()['Vault' + self.cipher_name]
this_cipher = cipher()
raise errors.AnsibleError("%s cipher could not be found" % self.cipher_name)
V = Vault(filename=filename, vault_password=password)
# try to unencrypt data
return_data = V._decrypt_to_string()
data = this_cipher.decrypt(data, self.password)
if not V._verify_decryption(return_data):
# split out sha and verify decryption
split_data = data.split("\n")
this_sha = split_data[0]
this_data = '\n'.join(split_data[1:])
test_sha = sha256(this_data).hexdigest()
if this_sha != test_sha:
raise errors.AnsibleError("Decryption of %s failed" % filename)
raise errors.AnsibleError("Decryption of %s failed" % filename)
this_sha, return_data = V._strip_sha(return_data)
return this_data
return return_data.strip()
class Vault(object):
def __init__(self, filename=None, cipher=None, vault_password=None):
self.filename = filename
self.vault_password = vault_password
self.cipher = cipher
self.version = '1.0'
def eval_header(self):
""" Read first line of the file and parse header """
# read first line
with open(self.filename) as f:
#head=[f.next() for x in xrange(1)]
head = f.next()
this_version = None
def _add_headers_and_hexify_encrypted_data(self, data):
this_cipher = None
# combine header and hexlified encrypted data in 80 char columns
# split segments
tmpdata = hexlify(data)
if len(head.split(';')) == 3:
tmpdata = [tmpdata[i:i+80] for i in range(0, len(tmpdata), 80)]
this_version = head.split(';')[1].strip()
this_cipher = head.split(';')[2].strip()
raise errors.AnsibleError("%s has an invalid header" % self.filename)
# validate acceptable version
this_version = float(this_version)
if this_version < C.VAULT_VERSION_MIN or this_version > C.VAULT_VERSION_MAX:
raise errors.AnsibleError("%s must have a version between %s and %s " % (self.filename,
# set properties
self.cipher = this_cipher
self.version = this_version
def create(self):
""" create a new encrypted file """
if os.path.isfile(self.filename):
if not self.cipher_name:
raise errors.AnsibleError("%s exists, please use 'edit' instead" % self.filename)
raise errors.AnsibleError("the cipher must be set before adding a header")
# drop the user into vim on file
dirty_data = HEADER + ";" + str(self.version) + ";" + self.cipher_name + "\n"
EDITOR = os.environ.get('EDITOR','vim')
for l in tmpdata:
call([EDITOR, self.filename])
dirty_data += l + '\n'
return dirty_data
def decrypt(self):
""" unencrypt a file inplace """
if not is_encrypted(self.filename):
def _split_headers_and_get_unhexified_data(self, data):
raise errors.AnsibleError("%s is not encrypted" % self.filename)
# used by decrypt
# set cipher based on file header
tmpdata = data.split('\n')
tmpheader = tmpdata[0].strip().split(';')
# decrypt it
self.version = str(tmpheader[1].strip())
data = self._decrypt_to_string()
self.cipher_name = str(tmpheader[2].strip())
clean_data = ''.join(tmpdata[1:])
# verify sha and then strip it out
# strip out newline, join, unhex
if not self._verify_decryption(data):
clean_data = [ x.strip() for x in clean_data ]
raise errors.AnsibleError("decryption of %s failed" % self.filename)
clean_data = unhexlify(''.join(clean_data))
this_sha, clean_data = self._strip_sha(data)
# write back to original file
return clean_data
f = open(self.filename, "wb")
def edit(self, filename=None, password=None, cipher=None, version=None):
class VaultEditor(object):
# uses helper methods for write_file(self, filename, data)
# to write a file so that code isn't duplicated for simple
# file I/O, ditto read_file(self, filename) and launch_editor(self, filename)
# ... "Don't Repeat Yourself", etc.
if not is_encrypted(self.filename):
def __init__(self, cipher_name, password, filename):
raise errors.AnsibleError("%s is not encrypted" % self.filename)
# instantiates a member variable for VaultLib
self.cipher_name = cipher_name
#decrypt to string
self.password = password
data = self._decrypt_to_string()
self.filename = filename
# verify sha and then strip it out
def create_file(self):
if not self._verify_decryption(data):
""" create a new encrypted file """
raise errors.AnsibleError("decryption of %s failed" % self.filename)
this_sha, clean_data = self._strip_sha(data)
# rewrite file without sha
if os.path.isfile(self.filename):
_, in_path = tempfile.mkstemp()
raise errors.AnsibleError("%s exists, please use 'edit' instead" % self.filename)
f = open(in_path, "wb")
tmpdata = f.write(clean_data)
# drop the user into vim on the unencrypted tmp file
# drop the user into vim on file
EDITOR = os.environ.get('EDITOR','vim')
EDITOR = os.environ.get('EDITOR','vim')
call([EDITOR, in_path])
call([EDITOR, self.filename])
tmpdata = self.read_data(self.filename)
f = open(in_path, "rb")
this_vault = VaultLib(self.password)
tmpdata = f.read()
this_vault.cipher_name = self.cipher_name
enc_data = this_vault.encrypt(tmpdata)
self.write_data(enc_data, self.filename)
self._string_to_encrypted_file(tmpdata, self.filename)
def decrypt_file(self):
if not os.path.isfile(self.filename):
def encrypt(self):
raise errors.AnsibleError("%s does not exist" % self.filename)
""" encrypt a file inplace """
tmpdata = self.read_data(self.filename)
if is_encrypted(self.filename):
this_vault = VaultLib(self.password)
raise errors.AnsibleError("%s is already encrypted" % self.filename)
if this_vault.is_encrypted(tmpdata):
dec_data = this_vault.decrypt(tmpdata)
self.write_data(dec_data, self.filename)
# read data
f = open(self.filename, "rb")
tmpdata = f.read()
self._string_to_encrypted_file(tmpdata, self.filename)
def rekey(self, newpassword):
""" unencrypt file then encrypt with new password """
if not is_encrypted(self.filename):
raise errors.AnsibleError("%s is not encrypted" % self.filename)
# unencrypt to string with old password
data = self._decrypt_to_string()
# verify sha and then strip it out
if not self._verify_decryption(data):
raise errors.AnsibleError("decryption of %s failed" % self.filename)
this_sha, clean_data = self._strip_sha(data)
# set password
self.vault_password = newpassword
self._string_to_encrypted_file(clean_data, self.filename)
def __load_cipher(self):
Load a cipher class by it's name
This is a lightweight "plugin" implementation to allow
for future support of other cipher types
whitelist = ['AES']
if self.cipher in whitelist:
self.cipher_obj = None
if self.cipher in globals():
this_cipher = globals()[self.cipher]
self.cipher_obj = this_cipher()
raise errors.AnsibleError("%s cipher could not be loaded" % self.cipher)
raise errors.AnsibleError("%s is not an allowed encryption cipher" % self.cipher)
def _decrypt_to_string(self):
""" decrypt file to string """
if not is_encrypted(self.filename):
raise errors.AnsibleError("%s is not encrypted" % self.filename)
raise errors.AnsibleError("%s is not encrypted" % self.filename)
# figure out what this is
def edit_file(self):
# strip out header and unhex the file
clean_stream = self._dirty_file_to_clean_file(self.filename)
# reset pointer
# decrypt to tmpfile
tmpdata = self.read_data(self.filename)
this_vault = VaultLib(self.password)
dec_data = this_vault.decrypt(tmpdata)
_, tmp_path = tempfile.mkstemp()
self.write_data(dec_data, tmp_path)
# create a byte stream to hold unencrypted
# drop the user into vim on the tmp file
dst = BytesIO()
EDITOR = os.environ.get('EDITOR','vim')
call([EDITOR, tmp_path])
# decrypt from src stream to dst stream
new_data = self.read_data(tmp_path)
self.cipher_obj.decrypt(clean_stream, dst, self.vault_password)
# create new vault and set cipher to old
# read data from the unencrypted stream
new_vault = VaultLib(self.password)
data = dst.read()
new_vault.cipher_name = this_vault.cipher_name
return data
# encrypt new data a write out to tmp
enc_data = new_vault.encrypt(new_data)
self.write_data(enc_data, tmp_path)
# shuffle tmp file into place
self.shuffle_files(tmp_path, self.filename)
def encrypt_file(self):
if not os.path.isfile(self.filename):
raise errors.AnsibleError("%s does not exist" % self.filename)
tmpdata = self.read_data(self.filename)
this_vault = VaultLib(self.password)
this_vault.cipher_name = self.cipher_name
if not this_vault.is_encrypted(tmpdata):
enc_data = this_vault.encrypt(tmpdata)
self.write_data(enc_data, self.filename)
raise errors.AnsibleError("%s is already encrypted" % self.filename)
def _dirty_file_to_clean_file(self, dirty_filename):
def rekey_file(self, new_password):
""" Strip out headers from a file, unhex and write to new file"""
# decrypt
tmpdata = self.read_data(self.filename)
this_vault = VaultLib(self.password)
dec_data = this_vault.decrypt(tmpdata)
# create new vault, set cipher to old and password to new
new_vault = VaultLib(new_password)
new_vault.cipher_name = this_vault.cipher_name
_, in_path = tempfile.mkstemp()
# re-encrypt data and re-write file
#_, out_path = tempfile.mkstemp()
enc_data = new_vault.encrypt(dec_data)
self.write_data(enc_data, self.filename)
# strip header from data, write rest to tmp file
def read_data(self, filename):
f = open(dirty_filename, "rb")
f = open(filename, "rb")
tmpdata = f.readlines()
tmpdata = f.read()
return tmpdata
tmpheader = tmpdata[0].strip()
def write_data(self, data, filename):
tmpdata = ''.join(tmpdata[1:])
# strip out newline, join, unhex
tmpdata = [ x.strip() for x in tmpdata ]
tmpdata = unhexlify(''.join(tmpdata))
# create and return stream
clean_stream = BytesIO(tmpdata)
return clean_stream
def _clean_stream_to_dirty_stream(self, clean_stream):
# combine header and hexlified encrypted data in 80 char columns
tmpdata = clean_stream.read()
tmpdata = hexlify(tmpdata)
tmpdata = [tmpdata[i:i+80] for i in range(0, len(tmpdata), 80)]
dirty_data = HEADER + ";" + str(self.version) + ";" + self.cipher + "\n"
for l in tmpdata:
dirty_data += l + '\n'
dirty_stream = BytesIO(dirty_data)
return dirty_stream
def _string_to_encrypted_file(self, tmpdata, filename):
""" Write a string of data to a file with the format ...
# sha256 the data
this_sha = sha256(tmpdata).hexdigest()
# combine sha + data to tmpfile
tmpdata = this_sha + "\n" + tmpdata
src_stream = BytesIO(tmpdata)
dst_stream = BytesIO()
# encrypt tmpfile
self.cipher_obj.encrypt(src_stream, dst_stream, self.password)
# hexlify tmpfile and combine with header
dirty_stream = self._clean_stream_to_dirty_stream(dst_stream)
if os.path.isfile(filename):
if os.path.isfile(filename):
# write back to original file
f = open(filename, "wb")
f = open(filename, "wb")
def shuffle_files(self, src, dest):
# overwrite dest with src
if os.path.isfile(dest):
shutil.move(src, dest)
def _verify_decryption(self, data):
""" Split data to sha/data and check the sha """
# split the sha and other data
this_sha, clean_data = self._strip_sha(data)
# does the decrypted data match the sha ?
class VaultAES(object):
clean_sha = sha256(clean_data).hexdigest()
# compare, return result
if this_sha == clean_sha:
return True
return False
def _strip_sha(self, data):
# is the first line a sha?
lines = data.split("\n")
this_sha = lines[0]
clean_data = '\n'.join(lines[1:])
return this_sha, clean_data
class AES(object):
# http://stackoverflow.com/a/16761459
# http://stackoverflow.com/a/16761459
@ -400,18 +271,22 @@ class AES(object):
return key, iv
return key, iv
def encrypt(self, in_file, out_file, password, key_length=32):
def encrypt(self, data, password, key_length=32):
""" Read plaintext data from in_file and write encrypted to out_file """
""" Read plaintext data from in_file and write encrypted to out_file """
bs = AES_.block_size
in_file = BytesIO(data)
out_file = BytesIO()
bs = AES.block_size
# Get a block of random data. EL does not have Crypto.Random.new()
# Get a block of random data. EL does not have Crypto.Random.new()
# so os.urandom is used for cross platform purposes
# so os.urandom is used for cross platform purposes
salt = os.urandom(bs - len('Salted__'))
salt = os.urandom(bs - len('Salted__'))
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
cipher = AES_.new(key, AES_.MODE_CBC, iv)
cipher = AES.new(key, AES.MODE_CBC, iv)
out_file.write('Salted__' + salt)
out_file.write('Salted__' + salt)
finished = False
finished = False
while not finished:
while not finished:
@ -422,16 +297,23 @@ class AES(object):
finished = True
finished = True
def decrypt(self, in_file, out_file, password, key_length=32):
return out_file.read()
def decrypt(self, data, password, key_length=32):
""" Read encrypted data from in_file and write decrypted to out_file """
""" Read encrypted data from in_file and write decrypted to out_file """
# http://stackoverflow.com/a/14989032
# http://stackoverflow.com/a/14989032
bs = AES_.block_size
in_file = BytesIO(data)
out_file = BytesIO()
bs = AES.block_size
salt = in_file.read(bs)[len('Salted__'):]
salt = in_file.read(bs)[len('Salted__'):]
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
key, iv = self.aes_derive_key_and_iv(password, salt, key_length, bs)
cipher = AES_.new(key, AES_.MODE_CBC, iv)
cipher = AES.new(key, AES.MODE_CBC, iv)
next_chunk = ''
next_chunk = ''
finished = False
finished = False
@ -444,5 +326,7 @@ class AES(object):
# reset the stream pointer to the beginning
# reset the stream pointer to the beginning
if hasattr(out_file, 'seek'):
return out_file.read()