Move ansible.utils.hashing to ansible.module_utils.common.hashing

Signed-off-by: Abhijeet Kasurde <akasurde@redhat.com>
pull/84053/head
Abhijeet Kasurde 2 months ago
parent 17623add2f
commit 85963c55b0

@ -1719,7 +1719,7 @@ def _extract_tar_file(tar, filename, b_dest, b_temp_path, expected_hash=None):
else:
with tempfile.NamedTemporaryFile(dir=b_temp_path, delete=False) as tmpfile_obj:
actual_hash = _consume_file(tar_obj, tmpfile_obj)
actual_hash = _consume_file(tar_obj, write_to=tmpfile_obj)
if expected_hash and actual_hash != expected_hash:
raise AnsibleError("Checksum mismatch for '%s' inside collection at '%s'"

@ -87,30 +87,10 @@ from ansible.module_utils.common.text.formatters import (
SIZE_RANGES,
)
import hashlib
def _get_available_hash_algorithms():
"""Return a dictionary of available hash function names and their associated function."""
algorithms = {}
for algorithm_name in hashlib.algorithms_available:
algorithm_func = getattr(hashlib, algorithm_name, None)
if algorithm_func:
try:
# Make sure the algorithm is actually available for use.
# Not all algorithms listed as available are actually usable.
# For example, md5 is not available in FIPS mode.
algorithm_func()
except Exception:
pass
else:
algorithms[algorithm_name] = algorithm_func
return algorithms
AVAILABLE_HASH_ALGORITHMS = _get_available_hash_algorithms()
from ansible.module_utils.common.hashing import (
generate_secure_file_checksum,
AVAILABLE_HASH_ALGORITHMS
)
from ansible.module_utils.six.moves.collections_abc import (
KeysView,
Mapping, MutableMapping,
@ -1483,27 +1463,30 @@ class AnsibleModule(object):
if not os.path.exists(b_filename):
return None
if os.path.isdir(b_filename):
self.fail_json(msg="attempted to take checksum of directory: %s" % filename)
self.fail_json(msg=f"attempted to take checksum of directory: {filename}")
# preserve old behaviour where the third parameter was a hash algorithm object
if hasattr(algorithm, 'hexdigest'):
digest_method = algorithm
else:
try:
digest_method = AVAILABLE_HASH_ALGORITHMS[algorithm]()
except KeyError:
self.fail_json(msg="Could not hash file '%s' with algorithm '%s'. Available algorithms: %s" %
(filename, algorithm, ', '.join(AVAILABLE_HASH_ALGORITHMS)))
blocksize = 64 * 1024
infile = open(os.path.realpath(b_filename), 'rb')
block = infile.read(blocksize)
while block:
digest_method.update(block)
block = infile.read(blocksize)
infile.close()
return digest_method.hexdigest()
return generate_secure_file_checksum(filename, hash_func=algorithm)
except IOError as e:
self.module.fail_json(
msg=f"Could not find file {filename} to calculate hash: {e}"
)
if algorithm not in AVAILABLE_HASH_ALGORITHMS:
self.fail_json(
msg=f"Could not hash file '{filename}' with algorithm '{algorithm}'. Available algorithms: {', '.join(AVAILABLE_HASH_ALGORITHMS)}"
)
try:
return generate_secure_file_checksum(filename, hash_func=AVAILABLE_HASH_ALGORITHMS[algorithm])
except IOError as e:
self.module.fail_json(
msg=f"Could not find file {filename} to calculate hash: {e}"
)
def md5(self, filename):
""" Return MD5 hex digest of local file using digest_from_file().

@ -0,0 +1,130 @@
# Copyright: Contributors to the Ansible project
# Copyright: (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import hashlib
import os
from hashlib import sha1
try:
from hashlib import md5 as _md5
except ImportError:
# Assume we're running in FIPS mode here
_md5 = None # type: ignore[assignment]
from ansible.module_utils.common.text.converters import to_bytes
def _get_available_hash_algorithms():
"""Return a dictionary of available hash function names and their associated function."""
algorithms = {}
for algorithm_name in hashlib.algorithms_available:
algorithm_func = getattr(hashlib, algorithm_name, None)
if algorithm_func:
try:
# Make sure the algorithm is actually available for use.
# Not all algorithms listed as available are actually usable.
# For example, md5 is not available in FIPS mode.
algorithm_func()
except Exception:
pass
else:
algorithms[algorithm_name] = algorithm_func
return algorithms
AVAILABLE_HASH_ALGORITHMS = _get_available_hash_algorithms()
def generate_secure_checksum(data, hash_func=hashlib.sha256):
"""Generates a secure checksum for the given data using the specified hash function.
Args:
data: The data to be hashed.
hash_func: The hash function to use (default: hashlib.sha256).
Returns:
A hexadecimal string representing the checksum.
"""
digest = hash_func()
data = to_bytes(data, errors='surrogate_or_strict')
digest.update(data)
return digest.hexdigest()
def generate_secure_file_checksum(filename, hash_func=hashlib.sha256, write_to=None):
"""Return a secure hash hex digest of local file, None if file is not present or a directory.
Args:
filename: The filename to be hashed.
hash_func: The hash function to use (default: hashlib.sha256).
write_to: The file handle to write to (default: None).
Returns:
A hexadecimal string representing the checksum.
"""
b_filename = to_bytes(filename, errors='surrogate_or_strict')
if not os.path.exists(b_filename) or os.path.isdir(to_bytes(filename, errors='strict')):
return None
digest = hash_func()
blocksize = 64 * 1024
try:
infile = open(os.path.realpath(b_filename), 'rb')
block = infile.read(blocksize)
while block:
if write_to is not None:
write_to.write(block)
write_to.flush()
digest.update(block)
block = infile.read(blocksize)
infile.close()
except IOError as e:
raise e
return digest.hexdigest()
#
# Backwards compat functions. Some modules include md5s in their return values
# Continue to support that for now. As of ansible-1.8, all of those modules
# should also return "checksum" (sha1 for now)
# Do not use md5 unless it is needed for:
# 1) Optional backwards compatibility
# 2) Compliance with a third party protocol
def secure_hash_s(data, hash_func=sha1):
# deprecated: description='Use generate_secure_checksum instead' core_version='2.21'
# Backward compatibility
return generate_secure_checksum(data=data, hash_func=hash_func)
def secure_hash(filename, hash_func=sha1):
# deprecated: description='Use generate_secure_file_checksum instead' core_version='2.21'
# Backward compatibility
return generate_secure_file_checksum(filename=filename, hash_func=hash_func)
#
# MD5 will not work on systems which are FIPS-140-2 compliant.
#
def md5s(data):
if not _md5: # type: ignore[truthy-function]
raise ValueError('MD5 not available. Possibly running in FIPS mode')
return secure_hash_s(data, _md5)
def md5(filename):
if not _md5: # type: ignore[truthy-function]
raise ValueError('MD5 not available. Possibly running in FIPS mode')
return secure_hash(filename, _md5)
# The checksum algorithm must match with the algorithm in ShellModule.checksum() method
checksum = secure_hash
checksum_s = secure_hash_s

@ -386,11 +386,11 @@ from ansible.errors import (
AnsibleFileNotFound,
)
from ansible.module_utils.six import PY3, text_type, binary_type
from ansible.module_utils.common.hashing import generate_secure_checksum
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.plugins.connection import ConnectionBase, BUFSIZE
from ansible.plugins.shell.powershell import _parse_clixml
from ansible.utils.display import Display
from ansible.utils.hashing import generate_secure_checksum
from ansible.utils.path import unfrackpath, makedirs_safe
display = Display()

@ -14,12 +14,12 @@ from ansible.inventory.group import to_safe_group_name as original_safe
from ansible.parsing.utils.addresses import parse_address
from ansible.plugins import AnsiblePlugin
from ansible.plugins.cache import CachePluginAdjudicator as CacheObject
from ansible.module_utils.common.hashing import generate_secure_checksum
from ansible.module_utils.common.text.converters import to_bytes, to_native
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.six import string_types
from ansible.template import Templar
from ansible.utils.display import Display
from ansible.utils.hashing import generate_secure_checksum
from ansible.utils.vars import combine_vars, load_extra_vars
display = Display()

@ -131,12 +131,12 @@ import string
import time
from ansible.errors import AnsibleError, AnsibleAssertionError
from ansible.module_utils.common.hashing import generate_secure_checksum
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.six import string_types
from ansible.parsing.splitter import parse_kv
from ansible.plugins.lookup import LookupBase
from ansible.utils.encrypt import BaseHash, do_encrypt, random_password, random_salt
from ansible.utils.hashing import generate_secure_checksum
from ansible.utils.path import makedirs_safe

@ -3,100 +3,15 @@
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import os
from hashlib import sha1, sha256
try:
from hashlib import md5 as _md5
except ImportError:
# Assume we're running in FIPS mode here
_md5 = None
from ansible.errors import AnsibleError
from ansible.module_utils.common.text.converters import to_bytes
def generate_secure_checksum(data, hash_func=sha256):
"""Generates a secure checksum for the given data using the specified hash function.
Args:
data: The data to be hashed.
hash_func: The hash function to use (default: hashlib.sha256).
Returns:
A hexadecimal string representing the checksum.
"""
digest = hash_func()
data = to_bytes(data, errors='surrogate_or_strict')
digest.update(data)
return digest.hexdigest()
def generate_secure_file_checksum(filename, hash_func=sha256):
"""Return a secure hash hex digest of local file, None if file is not present or a directory.
Args:
filename: The filename to be hashed.
hash_func: The hash function to use (default: hashlib.sha256).
Returns:
A hexadecimal string representing the checksum.
"""
if not os.path.exists(to_bytes(filename, errors='surrogate_or_strict')) or os.path.isdir(to_bytes(filename, errors='strict')):
return None
digest = hash_func()
blocksize = 64 * 1024
try:
infile = open(to_bytes(filename, errors='surrogate_or_strict'), 'rb')
block = infile.read(blocksize)
while block:
digest.update(block)
block = infile.read(blocksize)
infile.close()
except IOError as e:
raise AnsibleError(f"error while accessing the file {filename}, error was: {e}")
return digest.hexdigest()
#
# Backwards compat functions. Some modules include md5s in their return values
# Continue to support that for now. As of ansible-1.8, all of those modules
# should also return "checksum" (sha1 for now)
# Do not use md5 unless it is needed for:
# 1) Optional backwards compatibility
# 2) Compliance with a third party protocol
def secure_hash_s(data, hash_func=sha1):
# deprecated: description='Use generate_secure_checksum instead' core_version='2.21'
# Backward compatibility
return generate_secure_checksum(data=data, hash_func=hash_func)
def secure_hash(filename, hash_func=sha1):
# deprecated: description='Use generate_secure_file_checksum instead' core_version='2.21'
# Backward compatibility
return generate_secure_file_checksum(filename=filename, hash_func=hash_func)
#
# MD5 will not work on systems which are FIPS-140-2 compliant.
#
def md5s(data):
if not _md5:
raise ValueError('MD5 not available. Possibly running in FIPS mode')
return secure_hash_s(data, _md5)
def md5(filename):
if not _md5:
raise ValueError('MD5 not available. Possibly running in FIPS mode')
return secure_hash(filename, _md5)
# The checksum algorithm must match with the algorithm in ShellModule.checksum() method
checksum = secure_hash
checksum_s = secure_hash_s
# Backward compatibility
# deprecated: description='Use ansible.module_utils.common.hashing instead' core_version='2.21'
from ansible.module_utils.common.hashing import ( # pylint: disable=unused-import
checksum,
checksum_s,
generate_secure_checksum,
generate_secure_file_checksum,
md5, md5s, secure_hash,
secure_hash_s,
sha1,
_md5,
)

@ -16,12 +16,12 @@ from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleUndefinedVariable, AnsibleFileNotFound, AnsibleAssertionError
from ansible.inventory.host import Host
from ansible.inventory.helpers import sort_groups, get_group_vars
from ansible.module_utils.common.hashing import generate_secure_checksum
from ansible.module_utils.common.text.converters import to_text
from ansible.module_utils.six import text_type
from ansible.vars.fact_cache import FactCache
from ansible.template import Templar
from ansible.utils.display import Display
from ansible.utils.hashing import generate_secure_checksum
from ansible.utils.vars import combine_vars, load_extra_vars, load_options_vars
from ansible.utils.unsafe_proxy import wrap_var
from ansible.vars.clean import namespace_facts, clean_facts

@ -41,6 +41,7 @@ MODULE_UTILS_BASIC_FILES = frozenset(('ansible/__init__.py',
'ansible/module_utils/six/__init__.py',
'ansible/module_utils/_text.py',
'ansible/module_utils/common/collections.py',
'ansible/module_utils/common/hashing.py',
'ansible/module_utils/common/parameters.py',
'ansible/module_utils/common/warnings.py',
'ansible/module_utils/parsing/convert_bool.py',

@ -22,13 +22,13 @@ from ansible.cli.galaxy import GalaxyCLI
from ansible.config import manager
from ansible.errors import AnsibleError
from ansible.galaxy import api, collection, token
from ansible.module_utils.common.hashing import generate_secure_checksum
from ansible.module_utils.common.sentinel import Sentinel
from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
from ansible.module_utils.common.file import S_IRWU_RG_RO
import builtins
from ansible.utils import context_objects as co
from ansible.utils.display import Display
from ansible.utils.hashing import generate_secure_checksum
@pytest.fixture(autouse=True)
@ -1149,24 +1149,6 @@ def test_verify_file_hash_mismatching_hash(manifest_info):
assert error_queue[0].expected == different_digest
def test_consume_file(manifest):
manifest_file, checksum = manifest
assert checksum == collection._consume_file(manifest_file)
def test_consume_file_and_write_contents(manifest, manifest_info):
manifest_file, checksum = manifest
write_to = BytesIO()
actual_hash = collection._consume_file(manifest_file, write_to)
write_to.seek(0)
assert to_bytes(json.dumps(manifest_info)) == write_to.read()
assert actual_hash == checksum
def test_get_tar_file_member(tmp_tarfile):
temp_dir, tfile, filename, checksum = tmp_tarfile

@ -2,7 +2,7 @@
from __future__ import annotations
from ansible.module_utils.basic import _get_available_hash_algorithms
from ansible.module_utils.common.hashing import _get_available_hash_algorithms
def test_unavailable_algorithm(mocker):

@ -8,7 +8,7 @@ import hashlib
import tempfile
from ansible.module_utils.common.text.converters import to_bytes
from ansible.utils import hashing
from ansible.module_utils.common import hashing
import pytest

Loading…
Cancel
Save