From 83d2ce771ca674abeb9f6d8bec4f4422a8af924d Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Tue, 11 Nov 2025 17:33:45 -0500 Subject: [PATCH] Modify crypt library import pattern (#86120) Hide the functionality of the _internal/_encryption/_crypt.py module behind an object so that we don't have code executed at import time. --- lib/ansible/_internal/_encryption/_crypt.py | 251 +++++++++--------- lib/ansible/utils/encrypt.py | 11 +- test/units/_internal/_encryption/__init__.py | 0 .../units/_internal/_encryption/test_crypt.py | 149 +++++++++++ 4 files changed, 286 insertions(+), 125 deletions(-) create mode 100644 test/units/_internal/_encryption/__init__.py create mode 100644 test/units/_internal/_encryption/test_crypt.py diff --git a/lib/ansible/_internal/_encryption/_crypt.py b/lib/ansible/_internal/_encryption/_crypt.py index 0e23ff142f1..3326a4350d8 100644 --- a/lib/ansible/_internal/_encryption/_crypt.py +++ b/lib/ansible/_internal/_encryption/_crypt.py @@ -7,9 +7,10 @@ import ctypes import ctypes.util import os import sys +import typing as t from dataclasses import dataclass -__all__ = ['CRYPT_NAME', 'crypt', 'crypt_gensalt', 'HAS_CRYPT_GENSALT'] +__all__ = ['CryptFacade'] _FAILURE_TOKENS = frozenset({b'*0', b'*1'}) @@ -37,123 +38,133 @@ _CRYPT_LIBS = ( ), ) -for _lib_config in _CRYPT_LIBS: - if sys.platform in _lib_config.exclude_platforms: - continue - if _lib_config.include_platforms and sys.platform not in _lib_config.include_platforms: - continue - - if _lib_config.name is None: - _lib_so = None - elif _lib_config.is_path: - if os.path.exists(_lib_config.name): - _lib_so = _lib_config.name + +class CryptFacade: + """ + Provide an interface for various crypt libraries that might be available. + """ + + def __init__(self) -> None: + self._crypt_impl: t.Callable | None = None + self._crypt_gensalt_impl: t.Callable | None = None + self._use_crypt_r = False + self._use_crypt_gensalt_rn = False + self._crypt_name = "" + + self._setup() + + class _CryptData(ctypes.Structure): + _fields_ = [('_opaque', ctypes.c_char * 131072)] + + @property + def has_crypt_gensalt(self) -> bool: + return self._crypt_gensalt_impl is not None + + def _setup(self) -> None: + """Setup crypt implementation""" + for lib_config in _CRYPT_LIBS: + if sys.platform in lib_config.exclude_platforms: + continue + if lib_config.include_platforms and sys.platform not in lib_config.include_platforms: + continue + + if lib_config.name is None: + lib_so = None + elif lib_config.is_path: + if os.path.exists(lib_config.name): + lib_so = lib_config.name + else: + continue + else: + lib_so = ctypes.util.find_library(lib_config.name) + if not lib_so: + continue + + loaded_lib = ctypes.cdll.LoadLibrary(lib_so) + + try: + self._crypt_impl = loaded_lib.crypt_r + self._use_crypt_r = True + except AttributeError: + try: + self._crypt_impl = loaded_lib.crypt + except AttributeError: + continue + + if self._use_crypt_r: + self._crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(self._CryptData)] + self._crypt_impl.restype = ctypes.c_char_p + else: + self._crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p] + self._crypt_impl.restype = ctypes.c_char_p + + # Try to load crypt_gensalt (available in libxcrypt) + try: + self._crypt_gensalt_impl = loaded_lib.crypt_gensalt_rn + self._crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_int] + self._crypt_gensalt_impl.restype = ctypes.c_char_p + self._use_crypt_gensalt_rn = True + except AttributeError: + try: + self._crypt_gensalt_impl = loaded_lib.crypt_gensalt + self._crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int] + self._crypt_gensalt_impl.restype = ctypes.c_char_p + except AttributeError: + self._crypt_gensalt_impl = None + + self._crypt_name = lib_config.name + break + else: + raise ImportError('Cannot find crypt implementation') + + def crypt(self, word: bytes, salt: bytes) -> bytes: + """Hash a password using the system's crypt function.""" + ctypes.set_errno(0) + + if self._use_crypt_r: + data = self._CryptData() + ctypes.memset(ctypes.byref(data), 0, ctypes.sizeof(data)) + result = self._crypt_impl(word, salt, ctypes.byref(data)) + else: + result = self._crypt_impl(word, salt) + + errno = ctypes.get_errno() + if errno: + error_msg = os.strerror(errno) + raise OSError(errno, f'crypt failed: {error_msg}') + + if result is None: + raise ValueError('crypt failed: invalid salt or unsupported algorithm') + + if result in _FAILURE_TOKENS: + raise ValueError('crypt failed: invalid salt or unsupported algorithm') + + return result + + def crypt_gensalt(self, prefix: bytes, count: int, rbytes: bytes) -> bytes: + """Generate a salt string for use with crypt.""" + if not self.has_crypt_gensalt: + raise NotImplementedError('crypt_gensalt not available (requires libxcrypt)') + + ctypes.set_errno(0) + + if self._use_crypt_gensalt_rn: + output = ctypes.create_string_buffer(256) + result = self._crypt_gensalt_impl(prefix, count, rbytes, len(rbytes), output, len(output)) + if result is not None: + result = output.value else: - continue - else: - _lib_so = ctypes.util.find_library(_lib_config.name) - if not _lib_so: - continue - - _lib = ctypes.cdll.LoadLibrary(_lib_so) - - _use_crypt_r = False - try: - _crypt_impl = _lib.crypt_r - _use_crypt_r = True - except AttributeError: - try: - _crypt_impl = _lib.crypt - except AttributeError: - continue - - if _use_crypt_r: - - class _crypt_data(ctypes.Structure): - _fields_ = [('_opaque', ctypes.c_char * 131072)] - - _crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(_crypt_data)] - _crypt_impl.restype = ctypes.c_char_p - else: - _crypt_impl.argtypes = [ctypes.c_char_p, ctypes.c_char_p] - _crypt_impl.restype = ctypes.c_char_p - - # Try to load crypt_gensalt (available in libxcrypt) - _use_crypt_gensalt_rn = False - HAS_CRYPT_GENSALT = False - try: - _crypt_gensalt_impl = _lib.crypt_gensalt_rn - _crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int, ctypes.c_char_p, ctypes.c_int] - _crypt_gensalt_impl.restype = ctypes.c_char_p - _use_crypt_gensalt_rn = True - HAS_CRYPT_GENSALT = True - except AttributeError: - try: - _crypt_gensalt_impl = _lib.crypt_gensalt - _crypt_gensalt_impl.argtypes = [ctypes.c_char_p, ctypes.c_ulong, ctypes.c_char_p, ctypes.c_int] - _crypt_gensalt_impl.restype = ctypes.c_char_p - HAS_CRYPT_GENSALT = True - except AttributeError: - _crypt_gensalt_impl = None - - CRYPT_NAME = _lib_config.name - break -else: - raise ImportError('Cannot find crypt implementation') - - -def crypt(word: bytes, salt: bytes) -> bytes: - """Hash a password using the system's crypt function.""" - ctypes.set_errno(0) - - if _use_crypt_r: - data = _crypt_data() - ctypes.memset(ctypes.byref(data), 0, ctypes.sizeof(data)) - result = _crypt_impl(word, salt, ctypes.byref(data)) - else: - result = _crypt_impl(word, salt) - - errno = ctypes.get_errno() - if errno: - error_msg = os.strerror(errno) - raise OSError(errno, f'crypt failed: {error_msg}') - - if result is None: - raise ValueError('crypt failed: invalid salt or unsupported algorithm') - - if result in _FAILURE_TOKENS: - raise ValueError('crypt failed: invalid salt or unsupported algorithm') - - return result - - -def crypt_gensalt(prefix: bytes, count: int, rbytes: bytes) -> bytes: - """Generate a salt string for use with crypt.""" - if not HAS_CRYPT_GENSALT: - raise NotImplementedError('crypt_gensalt not available (requires libxcrypt)') - - ctypes.set_errno(0) - - if _use_crypt_gensalt_rn: - output = ctypes.create_string_buffer(256) - result = _crypt_gensalt_impl(prefix, count, rbytes, len(rbytes), output, len(output)) - if result is not None: - result = output.value - else: - result = _crypt_gensalt_impl(prefix, count, rbytes, len(rbytes)) - - errno = ctypes.get_errno() - if errno: - error_msg = os.strerror(errno) - raise OSError(errno, f'crypt_gensalt failed: {error_msg}') - - if result is None: - raise ValueError('crypt_gensalt failed: unable to generate salt') - - if result in _FAILURE_TOKENS: - raise ValueError('crypt_gensalt failed: invalid prefix or unsupported algorithm') - - return result - - -del _lib_config + result = self._crypt_gensalt_impl(prefix, count, rbytes, len(rbytes)) + + errno = ctypes.get_errno() + if errno: + error_msg = os.strerror(errno) + raise OSError(errno, f'crypt_gensalt failed: {error_msg}') + + if result is None: + raise ValueError('crypt_gensalt failed: unable to generate salt') + + if result in _FAILURE_TOKENS: + raise ValueError('crypt_gensalt failed: invalid prefix or unsupported algorithm') + + return result diff --git a/lib/ansible/utils/encrypt.py b/lib/ansible/utils/encrypt.py index 24bfd052bc2..68df755f0c5 100644 --- a/lib/ansible/utils/encrypt.py +++ b/lib/ansible/utils/encrypt.py @@ -38,7 +38,8 @@ except Exception as e: CRYPT_E = None HAS_CRYPT = False try: - from ansible._internal._encryption import _crypt + from ansible._internal._encryption._crypt import CryptFacade + _crypt_facade = CryptFacade() HAS_CRYPT = True except Exception as e: CRYPT_E = e @@ -121,14 +122,14 @@ class CryptHash(BaseHash): self.algo_data = self.algorithms[algorithm] - if self.algo_data.requires_gensalt and not _crypt.HAS_CRYPT_GENSALT: + if self.algo_data.requires_gensalt and not _crypt_facade.has_crypt_gensalt: raise AnsibleError(f"{self.algorithm!r} algorithm requires libxcrypt") def hash(self, secret: str, salt: str | None = None, salt_size: int | None = None, rounds: int | None = None, ident: str | None = None) -> str: rounds = self._rounds(rounds) ident = self._ident(ident) - if _crypt.HAS_CRYPT_GENSALT: + if _crypt_facade.has_crypt_gensalt: saltstring = self._gensalt(ident, rounds, salt, salt_size) else: saltstring = self._build_saltstring(ident, rounds, salt, salt_size) @@ -174,7 +175,7 @@ class CryptHash(BaseHash): count = rounds or 0 try: - salt_bytes = _crypt.crypt_gensalt(to_bytes(prefix), count, rbytes) + salt_bytes = _crypt_facade.crypt_gensalt(to_bytes(prefix), count, rbytes) return to_text(salt_bytes, errors='strict') except (NotImplementedError, ValueError) as e: raise AnsibleError(f"Failed to generate salt for {self.algorithm!r} algorithm") from e @@ -192,7 +193,7 @@ class CryptHash(BaseHash): def _hash(self, secret: str, saltstring: str) -> str: try: - result = _crypt.crypt(to_bytes(secret), to_bytes(saltstring)) + result = _crypt_facade.crypt(to_bytes(secret), to_bytes(saltstring)) except (OSError, ValueError) as e: raise AnsibleError(f"crypt does not support {self.algorithm!r} algorithm") from e diff --git a/test/units/_internal/_encryption/__init__.py b/test/units/_internal/_encryption/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/units/_internal/_encryption/test_crypt.py b/test/units/_internal/_encryption/test_crypt.py new file mode 100644 index 00000000000..661e2c66a66 --- /dev/null +++ b/test/units/_internal/_encryption/test_crypt.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +import errno +import pytest +from pytest_mock import MockerFixture + +from ansible._internal._encryption._crypt import _CryptLib, CryptFacade, _FAILURE_TOKENS + + +class TestCryptFacade: + + def test_unsupported_platform(self, mocker: MockerFixture) -> None: + """Test that unsupported platforms are skipped.""" + mock_libs = ( + _CryptLib('foo', include_platforms=frozenset({'fake_platform'})), + ) + mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs) + + with pytest.raises(ImportError, match=r'Cannot find crypt implementation'): + CryptFacade() + + def test_libc_fallback(self, mocker: MockerFixture) -> None: + """Test that a library name of None will load the libc library.""" + mock_libs = ( + _CryptLib(None), + ) + mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs) + load_lib_mock = mocker.patch('ctypes.cdll.LoadLibrary') + + crypt_facade = CryptFacade() + + load_lib_mock.assert_called_once_with(None) + assert crypt_facade._crypt_name is None + + def test_library_with_no_crypt_methods(self, mocker: MockerFixture) -> None: + """Test that a library without crypt() and crypt_r() is skipped.""" + mock_libs = ( + _CryptLib(None), + ) + + class MockCDLL: + pass + + mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs) + mocker.patch('ctypes.cdll.LoadLibrary', return_value=MockCDLL()) + + with pytest.raises(ImportError, match=r'Cannot find crypt implementation'): + CryptFacade() + + def test_library_with_no_crypt_r_or_crypt_gensalt_rn(self, mocker: MockerFixture) -> None: + """Test that a library without crypt_r() or crypt_gensalt_rn() is prepped correctly.""" + mock_libs = ( + _CryptLib(None), + ) + + class MockCDLL: + + class MockCrypt: + def __init__(self): + self.argtypes = None + self.restype = None + + def __init__(self): + self.crypt = self.MockCrypt() + self.crypt_gensalt = self.MockCrypt() + + mocker.patch('ansible._internal._encryption._crypt._CRYPT_LIBS', mock_libs) + mocker.patch('ctypes.cdll.LoadLibrary', return_value=MockCDLL()) + + crypt_facade = CryptFacade() + + assert crypt_facade._crypt_impl is not None + assert crypt_facade._crypt_impl.argtypes is not None + assert crypt_facade._crypt_impl.restype is not None + assert crypt_facade._use_crypt_r is False + + assert crypt_facade._crypt_gensalt_impl is not None + assert crypt_facade._crypt_gensalt_impl.argtypes is not None + assert crypt_facade._crypt_gensalt_impl.restype is not None + assert crypt_facade._use_crypt_gensalt_rn is False + assert crypt_facade.has_crypt_gensalt + + def test_crypt_fail_errno(self, mocker: MockerFixture) -> None: + """Test crypt() setting failure errno raises OSError.""" + mocker.patch('ctypes.get_errno', return_value=errno.EBADFD) + crypt_facade = CryptFacade() + + with pytest.raises(OSError, match=r'crypt failed:'): + crypt_facade.crypt(b"test", b"123") + + def test_crypt_result_none(self, mocker: MockerFixture) -> None: + """Test crypt() implementation returning None raises ValueError.""" + crypt_facade = CryptFacade() + mocker.patch.object(crypt_facade, '_crypt_impl', return_value=None) + + with pytest.raises(ValueError, match=r'crypt failed: invalid salt or unsupported algorithm'): + crypt_facade.crypt(b"test", b"123") + + def test_crypt_result_failure(self, mocker: MockerFixture) -> None: + """Test crypt() implementation returning failure token raises ValueError.""" + crypt_facade = CryptFacade() + mocker.patch.object(crypt_facade, '_crypt_impl', return_value=list(_FAILURE_TOKENS)[0]) + + with pytest.raises(ValueError, match=r'crypt failed: invalid salt or unsupported algorithm'): + crypt_facade.crypt(b"test", b"123") + + def test_crypt_gensalt_called_with_no_impl(self, mocker: MockerFixture) -> None: + """Calling crypt_gensalt() without impl should raise NotImplementedError.""" + crypt_facade = CryptFacade() + mock_prop = mocker.patch('ansible._internal._encryption._crypt.CryptFacade.has_crypt_gensalt', new_callable=mocker.PropertyMock) + mock_prop.return_value = False + + with pytest.raises(NotImplementedError, match=r'crypt_gensalt not available \(requires libxcrypt\)'): + crypt_facade.crypt_gensalt(b"", 1, b"") + + def test_crypt_gensalt(self, mocker: MockerFixture) -> None: + """Test the NOT _use_crypt_gensalt_rn code path of crypt_gensalt().""" + crypt_facade = CryptFacade() + crypt_facade._use_crypt_gensalt_rn = False + mock_impl = mocker.patch.object(crypt_facade, '_crypt_gensalt_impl', return_value='') + + crypt_facade.crypt_gensalt(b'', 1, b'') + mock_impl.assert_called_once_with(b'', 1, b'', 0) + + def test_crypt_gensalt_fail_errno(self, mocker: MockerFixture) -> None: + """Test crypt_gensalt() setting failure errno raises OSError.""" + mocker.patch('ctypes.get_errno', return_value=errno.EBADFD) + crypt_facade = CryptFacade() + + with pytest.raises(OSError, match=r'crypt_gensalt failed:'): + crypt_facade.crypt_gensalt(b'', 1, b'') + + def test_crypt_gensalt_result_none(self, mocker: MockerFixture) -> None: + """Test crypt_gensalt() implementation returning None raises ValueError.""" + crypt_facade = CryptFacade() + mocker.patch.object(crypt_facade, '_crypt_gensalt_impl', return_value=None) + + with pytest.raises(ValueError, match=r'crypt_gensalt failed: unable to generate salt'): + crypt_facade.crypt_gensalt(b'', 1, b'') + + def test_crypt_gensalt_result_failure(self, mocker: MockerFixture) -> None: + """Test crypt_gensalt() implementation returning failure token raises ValueError.""" + crypt_facade = CryptFacade() + # Skip the _rn version as it modifies impl return value + crypt_facade._use_crypt_gensalt_rn = False + mocker.patch.object(crypt_facade, '_crypt_gensalt_impl', return_value=list(_FAILURE_TOKENS)[0]) + + with pytest.raises(ValueError, match=r'crypt_gensalt failed: invalid prefix or unsupported algorithm'): + crypt_facade.crypt_gensalt(b'', 1, b'')