password lookup, handle ident properly when saved (#80251) (#80312)

* password lookup, handle ident properly when saved (#80251)

* password lookup, handle ident properly when saved

  Currently we format and save ident when present but we didn't account for this when reading the saved file
  Also added some more robust error handling.

(cherry picked from commit 0fd88717c9)

* fix try indent

* clog

* fix bad merge indentation
pull/80535/head
Brian Coca 3 years ago committed by GitHub
parent 261e5b74cc
commit ae02b5353d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,2 @@
bugfixes:
- password lookup now correctly reads stored ident fields.

@ -197,18 +197,31 @@ def _parse_content(content):
''' '''
password = content password = content
salt = None salt = None
ident = None
salt_slug = u' salt=' salt_slug = u' salt='
ident_slug = u' ident='
rem = u''
try: try:
sep = content.rindex(salt_slug) sep = content.rindex(salt_slug)
except ValueError: except ValueError:
# No salt # No salt
pass pass
else: else:
salt = password[sep + len(salt_slug):] rem = content[sep + len(salt_slug):]
password = content[:sep] password = content[:sep]
return password, salt if rem:
try:
sep = rem.rindex(ident_slug)
except ValueError:
# no ident
salt = rem
else:
ident = rem[sep + len(ident_slug):]
salt = rem[:sep]
return password, salt, ident
def _format_content(password, salt, encrypt=None, ident=None): def _format_content(password, salt, encrypt=None, ident=None):
@ -338,47 +351,74 @@ class LookupModule(LookupBase):
self.set_options(var_options=variables, direct=kwargs) self.set_options(var_options=variables, direct=kwargs)
for term in terms: for term in terms:
changed = None
relpath, params = self._parse_parameters(term) relpath, params = self._parse_parameters(term)
path = self._loader.path_dwim(relpath) path = self._loader.path_dwim(relpath)
b_path = to_bytes(path, errors='surrogate_or_strict') b_path = to_bytes(path, errors='surrogate_or_strict')
chars = _gen_candidate_chars(params['chars']) chars = _gen_candidate_chars(params['chars'])
ident = None
changed = None first_process = None
# make sure only one process finishes all the job first lockfile = None
first_process, lockfile = _get_lock(b_path)
try:
content = _read_password_file(b_path) # make sure only one process finishes all the job first
first_process, lockfile = _get_lock(b_path)
if content is None or b_path == to_bytes('/dev/null'):
plaintext_password = random_password(params['length'], chars, params['seed']) content = _read_password_file(b_path)
salt = None
changed = True if content is None or b_path == to_bytes('/dev/null'):
else: plaintext_password = random_password(params['length'], chars, params['seed'])
plaintext_password, salt = _parse_content(content) salt = None
changed = True
encrypt = params['encrypt'] else:
if encrypt and not salt: plaintext_password, salt, ident = _parse_content(content)
changed = True
try: encrypt = params['encrypt']
salt = random_salt(BaseHash.algorithms[encrypt].salt_size) if encrypt and not salt:
except KeyError: changed = True
salt = random_salt() try:
salt = random_salt(BaseHash.algorithms[encrypt].salt_size)
ident = params['ident'] except KeyError:
if encrypt and not ident: salt = random_salt()
changed = True
try: ident = params['ident']
ident = BaseHash.algorithms[encrypt].implicit_ident if encrypt and not ident:
except KeyError: changed = True
ident = None try:
ident = BaseHash.algorithms[encrypt].implicit_ident
if changed and b_path != to_bytes('/dev/null'): except KeyError:
content = _format_content(plaintext_password, salt, encrypt=encrypt, ident=ident) ident = None
_write_password_file(b_path, content)
encrypt = params['encrypt']
if first_process: if encrypt and not salt:
# let other processes continue changed = True
_release_lock(lockfile) try:
salt = random_salt(BaseHash.algorithms[encrypt].salt_size)
except KeyError:
salt = random_salt()
if not ident:
ident = params['ident']
elif params['ident'] and ident != params['ident']:
raise AnsibleError('The ident parameter provided (%s) does not match the stored one (%s).' % (ident, params['ident']))
if encrypt and not ident:
try:
ident = BaseHash.algorithms[encrypt].implicit_ident
except KeyError:
ident = None
if ident:
changed = True
if changed and b_path != to_bytes('/dev/null'):
content = _format_content(plaintext_password, salt, encrypt=encrypt, ident=ident)
_write_password_file(b_path, content)
finally:
if first_process:
# let other processes continue
_release_lock(lockfile)
if encrypt: if encrypt:
password = do_encrypt(plaintext_password, encrypt, salt=salt, ident=ident) password = do_encrypt(plaintext_password, encrypt, salt=salt, ident=ident)

@ -231,12 +231,15 @@ class PasslibHash(BaseHash):
settings['ident'] = ident settings['ident'] = ident
# starting with passlib 1.7 'using' and 'hash' should be used instead of 'encrypt' # starting with passlib 1.7 'using' and 'hash' should be used instead of 'encrypt'
if hasattr(self.crypt_algo, 'hash'): try:
result = self.crypt_algo.using(**settings).hash(secret) if hasattr(self.crypt_algo, 'hash'):
elif hasattr(self.crypt_algo, 'encrypt'): result = self.crypt_algo.using(**settings).hash(secret)
result = self.crypt_algo.encrypt(secret, **settings) elif hasattr(self.crypt_algo, 'encrypt'):
else: result = self.crypt_algo.encrypt(secret, **settings)
raise AnsibleError("installed passlib version %s not supported" % passlib.__version__) else:
raise AnsibleError("installed passlib version %s not supported" % passlib.__version__)
except ValueError as e:
raise AnsibleError("Could not hash the secret.", orig_exc=e)
# passlib.hash should always return something or raise an exception. # passlib.hash should always return something or raise an exception.
# Still ensure that there is always a result. # Still ensure that there is always a result.

@ -330,23 +330,34 @@ class TestRandomPassword(unittest.TestCase):
class TestParseContent(unittest.TestCase): class TestParseContent(unittest.TestCase):
def test_empty_password_file(self): def test_empty_password_file(self):
plaintext_password, salt = password._parse_content(u'') plaintext_password, salt, ident = password._parse_content(u'')
self.assertEqual(plaintext_password, u'') self.assertEqual(plaintext_password, u'')
self.assertEqual(salt, None) self.assertEqual(salt, None)
self.assertEqual(ident, None)
def test(self): def test(self):
expected_content = u'12345678' expected_content = u'12345678'
file_content = expected_content file_content = expected_content
plaintext_password, salt = password._parse_content(file_content) plaintext_password, salt, ident = password._parse_content(file_content)
self.assertEqual(plaintext_password, expected_content) self.assertEqual(plaintext_password, expected_content)
self.assertEqual(salt, None) self.assertEqual(salt, None)
self.assertEqual(ident, None)
def test_with_salt(self): def test_with_salt(self):
expected_content = u'12345678 salt=87654321' expected_content = u'12345678 salt=87654321'
file_content = expected_content file_content = expected_content
plaintext_password, salt = password._parse_content(file_content) plaintext_password, salt, ident = password._parse_content(file_content)
self.assertEqual(plaintext_password, u'12345678') self.assertEqual(plaintext_password, u'12345678')
self.assertEqual(salt, u'87654321') self.assertEqual(salt, u'87654321')
self.assertEqual(ident, None)
def test_with_salt_and_ident(self):
expected_content = u'12345678 salt=87654321 ident=2a'
file_content = expected_content
plaintext_password, salt, ident = password._parse_content(file_content)
self.assertEqual(plaintext_password, u'12345678')
self.assertEqual(salt, u'87654321')
self.assertEqual(ident, u'2a')
class TestFormatContent(unittest.TestCase): class TestFormatContent(unittest.TestCase):

Loading…
Cancel
Save