diff --git a/changelogs/fragments/60388-openssl_privatekey-format.yml b/changelogs/fragments/60388-openssl_privatekey-format.yml new file mode 100644 index 00000000000..ad3eff915ba --- /dev/null +++ b/changelogs/fragments/60388-openssl_privatekey-format.yml @@ -0,0 +1,2 @@ +minor_changes: +- "openssl_privatekey - add ``format`` and ``format_mismatch`` options." diff --git a/lib/ansible/module_utils/crypto.py b/lib/ansible/module_utils/crypto.py index be91041f4c1..88774c676af 100644 --- a/lib/ansible/module_utils/crypto.py +++ b/lib/ansible/module_utils/crypto.py @@ -1903,3 +1903,29 @@ else: no >>= 1 count += 1 return count + + +PEM_START = '-----BEGIN ' +PEM_END = '-----' +PKCS8_PRIVATEKEY_NAMES = ('PRIVATE KEY', 'ENCRYPTED PRIVATE KEY') +PKCS1_PRIVATEKEY_SUFFIX = ' PRIVATE KEY' + + +def identify_private_key_format(content): + '''Given the contents of a private key file, identifies its format.''' + # See https://github.com/openssl/openssl/blob/master/crypto/pem/pem_pkey.c#L40-L85 + # (PEM_read_bio_PrivateKey) + # and https://github.com/openssl/openssl/blob/master/include/openssl/pem.h#L46-L47 + # (PEM_STRING_PKCS8, PEM_STRING_PKCS8INF) + try: + lines = content.decode('utf-8').splitlines(False) + if lines[0].startswith(PEM_START) and lines[0].endswith(PEM_END) and len(lines[0]) > len(PEM_START) + len(PEM_END): + name = lines[0][len(PEM_START):-len(PEM_END)] + if name in PKCS8_PRIVATEKEY_NAMES: + return 'pkcs8' + if len(name) > len(PKCS1_PRIVATEKEY_SUFFIX) and name.endswith(PKCS1_PRIVATEKEY_SUFFIX): + return 'pkcs1' + return 'unknown-pem' + except UnicodeDecodeError: + pass + return 'raw' diff --git a/lib/ansible/modules/crypto/openssl_privatekey.py b/lib/ansible/modules/crypto/openssl_privatekey.py index acd04ad180a..06d03f54155 100644 --- a/lib/ansible/modules/crypto/openssl_privatekey.py +++ b/lib/ansible/modules/crypto/openssl_privatekey.py @@ -123,6 +123,32 @@ options: default: auto choices: [ auto, cryptography, pyopenssl ] version_added: "2.8" + format: + description: + - Determines which format the private key is written in. By default, PKCS1 (traditional OpenSSL format) + is used for all keys which support it. Please note that not every key can be exported in any format. + - The value C(auto) selects a fromat based on the key format. The value C(auto_ignore) does the same, + but for existing private key files, it will not force a regenerate when its format is not the automatically + selected one for generation. + - Note that if the format for an existing private key mismatches, the key is *regenerated* by default. + To change this behavior, use the I(format_mismatch) option. + - The I(format) option is only supported by the C(cryptography) backend. The C(pyopenssl) backend will + fail if a value different from C(auto_ignore) is used. + type: str + default: auto_ignore + choices: [ pkcs1, pkcs8, raw, auto, auto_ignore ] + version_added: "2.10" + format_mismatch: + description: + - Determines behavior of the module if the format of a private key does not match the expected format, but all + other parameters are as expected. + - If set to C(regenerate) (default), generates a new private key. + - If set to C(convert), the key will be converted to the new format instead. + - Only supported by the C(cryptography) backend. + type: str + default: regenerate + choices: [ regenerate, convert ] + version_added: "2.10" backup: description: - Create a backup file including a timestamp so you can get @@ -293,6 +319,8 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject): self.cipher = module.params['cipher'] self.privatekey = None self.fingerprint = {} + self.format = module.params['format'] + self.format_mismatch = module.params['format_mismatch'] self.backup = module.params['backup'] self.backup_file = None @@ -301,7 +329,13 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject): module.params['mode'] = '0600' @abc.abstractmethod - def _generate_private_key_data(self): + def _generate_private_key(self): + """(Re-)Generate private key.""" + pass + + @abc.abstractmethod + def _get_private_key_data(self): + """Return bytes for self.privatekey""" pass @abc.abstractmethod @@ -311,10 +345,19 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject): def generate(self, module): """Generate a keypair.""" - if not self.check(module, perms_required=False) or self.force: + if not self.check(module, perms_required=False, ignore_conversion=True) or self.force: + # Regenerate + if self.backup: + self.backup_file = module.backup_local(self.path) + self._generate_private_key() + privatekey_data = self._get_private_key_data() + crypto_utils.write_file(module, privatekey_data, 0o600) + self.changed = True + elif not self.check(module, perms_required=False, ignore_conversion=False): + # Convert if self.backup: self.backup_file = module.backup_local(self.path) - privatekey_data = self._generate_private_key_data() + privatekey_data = self._get_private_key_data() crypto_utils.write_file(module, privatekey_data, 0o600) self.changed = True @@ -336,7 +379,11 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject): def _check_size_and_type(self): pass - def check(self, module, perms_required=True): + @abc.abstractmethod + def _check_format(self): + pass + + def check(self, module, perms_required=True, ignore_conversion=True): """Ensure the resource is in its desired state.""" state_and_perms = super(PrivateKeyBase, self).check(module, perms_required) @@ -344,7 +391,14 @@ class PrivateKeyBase(crypto_utils.OpenSSLObject): if not state_and_perms or not self._check_passphrase(): return False - return self._check_size_and_type() + if not self._check_size_and_type(): + return False + + if not self._check_format(): + if ignore_conversion or self.format_mismatch != 'convert': + return False + + return True def dump(self): """Serialize the object into a dictionary.""" @@ -374,14 +428,19 @@ class PrivateKeyPyOpenSSL(PrivateKeyBase): else: module.fail_json(msg="PyOpenSSL backend only supports RSA and DSA keys.") - def _generate_private_key_data(self): - self.privatekey = crypto.PKey() + if self.format != 'auto_ignore': + module.fail_json(msg="PyOpenSSL backend only supports auto_ignore format.") + def _generate_private_key(self): + """(Re-)Generate private key.""" + self.privatekey = crypto.PKey() try: self.privatekey.generate_key(self.type, self.size) except (TypeError, ValueError) as exc: raise PrivateKeyError(exc) + def _get_private_key_data(self): + """Return bytes for self.privatekey""" if self.cipher and self.passphrase: return crypto.dump_privatekey(crypto.FILETYPE_PEM, self.privatekey, self.cipher, to_bytes(self.passphrase)) @@ -412,6 +471,10 @@ class PrivateKeyPyOpenSSL(PrivateKeyBase): return _check_size(privatekey) and _check_type(privatekey) + def _check_format(self): + # Not supported by this backend + return True + def dump(self): """Serialize the object into a dictionary.""" @@ -489,8 +552,16 @@ class PrivateKeyCryptography(PrivateKeyBase): if not CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': self.module.fail_json(msg='Your cryptography version does not support Ed448') - def _generate_private_key_data(self): - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL + def _get_wanted_format(self): + if self.format not in ('auto', 'auto_ignore'): + return self.format + if self.type in ('X25519', 'X448', 'Ed25519', 'Ed448'): + return 'pkcs8' + else: + return 'pkcs1' + + def _generate_private_key(self): + """(Re-)Generate private key.""" try: if self.type == 'RSA': self.privatekey = cryptography.hazmat.primitives.asymmetric.rsa.generate_private_key( @@ -505,16 +576,12 @@ class PrivateKeyCryptography(PrivateKeyBase): ) if CRYPTOGRAPHY_HAS_X25519_FULL and self.type == 'X25519': self.privatekey = cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.generate() - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8 if CRYPTOGRAPHY_HAS_X448 and self.type == 'X448': self.privatekey = cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.generate() - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8 if CRYPTOGRAPHY_HAS_ED25519 and self.type == 'Ed25519': self.privatekey = cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.generate() - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8 if CRYPTOGRAPHY_HAS_ED448 and self.type == 'Ed448': self.privatekey = cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.generate() - export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8 if self.type == 'ECC' and self.curve in self.curves: if self.curves[self.curve]['deprecated']: self.module.warn('Elliptic curves of type {0} should not be used for new keys!'.format(self.curve)) @@ -525,6 +592,23 @@ class PrivateKeyCryptography(PrivateKeyBase): except cryptography.exceptions.UnsupportedAlgorithm as dummy: self.module.fail_json(msg='Cryptography backend does not support the algorithm required for {0}'.format(self.type)) + def _get_private_key_data(self): + """Return bytes for self.privatekey""" + # Select export format and encoding + try: + export_format = self._get_wanted_format() + export_encoding = cryptography.hazmat.primitives.serialization.Encoding.PEM + if export_format == 'pkcs1': + # "TraditionalOpenSSL" format is PKCS1 + export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.TraditionalOpenSSL + elif export_format == 'pkcs8': + export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.PKCS8 + elif export_format == 'raw': + export_format = cryptography.hazmat.primitives.serialization.PrivateFormat.Raw + export_encoding = cryptography.hazmat.primitives.serialization.Encoding.Raw + except AttributeError: + self.module.fail_json(msg='Cryptography backend does not support the selected output format "{0}"'.format(self.format)) + # Select key encryption encryption_algorithm = cryptography.hazmat.primitives.serialization.NoEncryption() if self.cipher and self.passphrase: @@ -534,17 +618,48 @@ class PrivateKeyCryptography(PrivateKeyBase): self.module.fail_json(msg='Cryptography backend can only use "auto" for cipher option.') # Serialize key - return self.privatekey.private_bytes( - encoding=cryptography.hazmat.primitives.serialization.Encoding.PEM, - format=export_format, - encryption_algorithm=encryption_algorithm - ) + try: + return self.privatekey.private_bytes( + encoding=export_encoding, + format=export_format, + encryption_algorithm=encryption_algorithm + ) + except ValueError as e: + self.module.fail_json( + msg='Cryptography backend cannot serialize the private key in the required format "{0}"'.format(self.format) + ) + except Exception as dummy: + self.module.fail_json( + msg='Error while serializing the private key in the required format "{0}"'.format(self.format), + exception=traceback.format_exc() + ) def _load_privatekey(self): try: + # Read bytes with open(self.path, 'rb') as f: + data = f.read() + # Interpret bytes depending on format. + format = crypto_utils.identify_private_key_format(data) + if format == 'raw': + if len(data) == 56 and CRYPTOGRAPHY_HAS_X448: + return cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey.from_private_bytes(data) + if len(data) == 57 and CRYPTOGRAPHY_HAS_ED448: + return cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey.from_private_bytes(data) + if len(data) == 32: + if CRYPTOGRAPHY_HAS_X25519 and (self.type == 'X25519' or not CRYPTOGRAPHY_HAS_ED25519): + return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data) + if CRYPTOGRAPHY_HAS_ED25519 and (self.type == 'Ed25519' or not CRYPTOGRAPHY_HAS_X25519): + return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data) + if CRYPTOGRAPHY_HAS_X25519 and CRYPTOGRAPHY_HAS_ED25519: + try: + return cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey.from_private_bytes(data) + except Exception: + return cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(data) + raise PrivateKeyError('Cannot load raw key') + else: return cryptography.hazmat.primitives.serialization.load_pem_private_key( - f.read(), + data, None if self.passphrase is None else to_bytes(self.passphrase), backend=self.cryptography_backend ) @@ -565,12 +680,17 @@ class PrivateKeyCryptography(PrivateKeyBase): def _check_passphrase(self): try: with open(self.path, 'rb') as f: + data = f.read() + format = crypto_utils.identify_private_key_format(data) + if format == 'raw': + # Raw keys cannot be encrypted + return self.passphrase is None + else: return cryptography.hazmat.primitives.serialization.load_pem_private_key( - f.read(), + data, None if self.passphrase is None else to_bytes(self.passphrase), backend=self.cryptography_backend ) - return True except Exception as dummy: return False @@ -598,6 +718,17 @@ class PrivateKeyCryptography(PrivateKeyBase): return False + def _check_format(self): + if self.format == 'auto_ignore': + return True + try: + with open(self.path, 'rb') as f: + content = f.read() + format = crypto_utils.identify_private_key_format(content) + return format == self._get_wanted_format() + except Exception as dummy: + return False + def dump(self): """Serialize the object into a dictionary.""" result = super(PrivateKeyCryptography, self).dump() @@ -627,6 +758,8 @@ def main(): passphrase=dict(type='str', no_log=True), cipher=dict(type='str'), backup=dict(type='bool', default=False), + format=dict(type='str', default='auto_ignore', choices=['pkcs1', 'pkcs8', 'raw', 'auto', 'auto_ignore']), + format_mismatch=dict(type='str', default='regenerate', choices=['regenerate', 'convert']), select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'), ), supports_check_mode=True, diff --git a/test/integration/targets/openssl_privatekey/tasks/impl.yml b/test/integration/targets/openssl_privatekey/tasks/impl.yml index 5de08b79b97..900fec9c0f5 100644 --- a/test/integration/targets/openssl_privatekey/tasks/impl.yml +++ b/test/integration/targets/openssl_privatekey/tasks/impl.yml @@ -277,3 +277,132 @@ stat: path: '{{ output_dir }}/privatekey_mode.pem' register: privatekey_mode_3_stat + +- block: + - name: Generate privatekey_fmt_1 - auto format + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: auto + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_1 + + - name: Generate privatekey_fmt_1 - auto format (idempotent) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: auto + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_2 + + - name: Generate privatekey_fmt_1 - PKCS1 format + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: pkcs1 + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_3 + + - name: Generate privatekey_fmt_1 - PKCS8 format + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: pkcs8 + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_4 + + - name: Generate privatekey_fmt_1 - PKCS8 format (idempotent) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: pkcs8 + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_5 + + - name: Generate privatekey_fmt_1 - auto format (ignore) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: auto_ignore + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_6 + + - name: Generate privatekey_fmt_1 - auto format (no ignore) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: auto + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_7 + + - name: Generate privatekey_fmt_1 - raw format (fail) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: raw + select_crypto_backend: '{{ select_crypto_backend }}' + ignore_errors: yes + register: privatekey_fmt_1_step_8 + + - name: Generate privatekey_fmt_1 - PKCS8 format (convert) + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_9_before + + - name: Generate privatekey_fmt_1 - PKCS8 format (convert) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + format: pkcs8 + format_mismatch: convert + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_9 + + - name: Generate privatekey_fmt_1 - PKCS8 format (convert) + openssl_privatekey_info: + path: '{{ output_dir }}/privatekey_fmt_1.pem' + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_9_after + + when: 'select_crypto_backend == "cryptography"' + +- block: + - name: Generate privatekey_fmt_2 - PKCS8 format + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_2.pem' + type: X448 + format: pkcs8 + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_2_step_1 + + - name: Generate privatekey_fmt_2 - PKCS8 format (idempotent) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_2.pem' + type: X448 + format: pkcs8 + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_2_step_2 + + - name: Generate privatekey_fmt_2 - raw format + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_2.pem' + type: X448 + format: raw + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_2_step_3 + + - name: Generate privatekey_fmt_2 - raw format (idempotent) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_2.pem' + type: X448 + format: raw + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_2_step_4 + + - name: Generate privatekey_fmt_2 - auto format (ignore) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_2.pem' + format: auto_ignore + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_5 + + - name: Generate privatekey_fmt_2 - auto format (no ignore) + openssl_privatekey: + path: '{{ output_dir }}/privatekey_fmt_2.pem' + format: auto + select_crypto_backend: '{{ select_crypto_backend }}' + register: privatekey_fmt_1_step_6 + + when: 'select_crypto_backend == "cryptography" and cryptography_version.stdout is version("2.6", ">=")' diff --git a/test/integration/targets/openssl_privatekey/tasks/main.yml b/test/integration/targets/openssl_privatekey/tasks/main.yml index a534f2b40c5..2126bddff68 100644 --- a/test/integration/targets/openssl_privatekey/tasks/main.yml +++ b/test/integration/targets/openssl_privatekey/tasks/main.yml @@ -37,6 +37,8 @@ select_crypto_backend: pyopenssl - import_tasks: ../tests/validate.yml + vars: + select_crypto_backend: pyopenssl # FIXME: minimal pyOpenSSL version?! when: pyopenssl_version.stdout is version('0.6', '>=') @@ -58,6 +60,8 @@ select_crypto_backend: cryptography - import_tasks: ../tests/validate.yml + vars: + select_crypto_backend: pyopenssl when: cryptography_version.stdout is version('0.5', '>=') @@ -85,6 +89,11 @@ - DSA register: fingerprint_cryptography + - name: Verify that keys were not regenerated + assert: + that: + - fingerprint_cryptography is not changed + - name: Verify that fingerprints match assert: that: item.0.fingerprint[item.2] == item.1.fingerprint[item.2] diff --git a/test/integration/targets/openssl_privatekey/tests/validate.yml b/test/integration/targets/openssl_privatekey/tests/validate.yml index c7762173282..0c5f29fb129 100644 --- a/test/integration/targets/openssl_privatekey/tests/validate.yml +++ b/test/integration/targets/openssl_privatekey/tests/validate.yml @@ -160,3 +160,29 @@ - privatekey_mode_3 is changed - privatekey_mode_3_stat.stat.mode == '0400' - privatekey_mode_1_stat.stat.mtime != privatekey_mode_3_stat.stat.mtime + +- name: Validate format 1 + assert: + that: + - privatekey_fmt_1_step_1 is changed + - privatekey_fmt_1_step_2 is not changed + - privatekey_fmt_1_step_3 is not changed + - privatekey_fmt_1_step_4 is changed + - privatekey_fmt_1_step_5 is not changed + - privatekey_fmt_1_step_6 is not changed + - privatekey_fmt_1_step_7 is changed + - privatekey_fmt_1_step_8 is failed + - privatekey_fmt_1_step_9 is changed + - privatekey_fmt_1_step_9_before.public_key == privatekey_fmt_1_step_9_after.public_key + when: 'select_crypto_backend == "cryptography"' + +- name: Validate format 2 + assert: + that: + - privatekey_fmt_2_step_1 is changed + - privatekey_fmt_2_step_2 is not changed + - privatekey_fmt_2_step_3 is changed + - privatekey_fmt_2_step_4 is not changed + - privatekey_fmt_2_step_5 is not changed + - privatekey_fmt_2_step_6 is changed + when: 'select_crypto_backend == "cryptography" and cryptography_version.stdout is version("2.6", ">=")'