You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ansible/lib/ansible/galaxy/collection/gpg.py

285 lines
7.2 KiB
Python

# -*- coding: utf-8 -*-
# Copyright: (c) 2022, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""Signature verification helpers."""
from __future__ import annotations
from ansible.errors import AnsibleError
from ansible.galaxy.user_agent import user_agent
from ansible.module_utils.urls import open_url
import contextlib
import inspect
import os
import subprocess
import sys
import typing as t
from dataclasses import dataclass, fields as dc_fields
from functools import partial
from urllib.error import HTTPError, URLError
if t.TYPE_CHECKING:
from ansible.utils.display import Display
IS_PY310_PLUS = sys.version_info[:2] >= (3, 10)
frozen_dataclass = partial(dataclass, frozen=True, **({'slots': True} if IS_PY310_PLUS else {}))
def get_signature_from_source(source, display=None): # type: (str, t.Optional[Display]) -> str
if display is not None:
display.vvvv(f"Using signature at {source}")
try:
with open_url(
source,
http_agent=user_agent(),
validate_certs=True,
follow_redirects='safe'
) as resp:
signature = resp.read()
except (HTTPError, URLError) as e:
raise AnsibleError(
f"Failed to get signature for collection verification from '{source}': {e}"
) from e
return signature
def run_gpg_verify(
manifest_file, # type: str
signature, # type: str
keyring, # type: str
display, # type: Display
): # type: (...) -> tuple[str, int]
status_fd_read, status_fd_write = os.pipe()
# running the gpg command will create the keyring if it does not exist
remove_keybox = not os.path.exists(keyring)
cmd = [
'gpg',
f'--status-fd={status_fd_write}',
'--verify',
'--batch',
'--no-tty',
'--no-default-keyring',
f'--keyring={keyring}',
'-',
manifest_file,
]
cmd_str = ' '.join(cmd)
display.vvvv(f"Running command '{cmd}'")
try:
p = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
pass_fds=(status_fd_write,),
encoding='utf8',
)
except (FileNotFoundError, subprocess.SubprocessError) as err:
raise AnsibleError(
f"Failed during GnuPG verification with command '{cmd_str}': {err}"
) from err
else:
stdout, stderr = p.communicate(input=signature)
finally:
os.close(status_fd_write)
if remove_keybox:
with contextlib.suppress(OSError):
os.remove(keyring)
with os.fdopen(status_fd_read) as f:
stdout = f.read()
display.vvvv(
f"stdout: \n{stdout}\nstderr: \n{stderr}\n(exit code {p.returncode})"
)
return stdout, p.returncode
def parse_gpg_errors(status_out): # type: (str) -> t.Iterator[GpgBaseError]
for line in status_out.splitlines():
if not line:
continue
try:
_dummy, status, remainder = line.split(maxsplit=2)
except ValueError:
_dummy, status = line.split(maxsplit=1)
remainder = None
try:
cls = GPG_ERROR_MAP[status]
except KeyError:
continue
fields = [status]
if remainder:
fields.extend(
remainder.split(
None,
len(dc_fields(cls)) - 2
)
)
yield cls(*fields)
@frozen_dataclass
class GpgBaseError(Exception):
status: str
@classmethod
def get_gpg_error_description(cls) -> str:
"""Return the current class description."""
return ' '.join(cls.__doc__.split())
def __post_init__(self):
for field_name, field_type in inspect.get_annotations(type(self), eval_str=True).items():
super(GpgBaseError, self).__setattr__(field_name, field_type(getattr(self, field_name)))
@frozen_dataclass
class GpgExpSig(GpgBaseError):
"""The signature with the keyid is good, but the signature is expired."""
keyid: str
username: str
@frozen_dataclass
class GpgExpKeySig(GpgBaseError):
"""The signature with the keyid is good, but the signature was made by an expired key."""
keyid: str
username: str
@frozen_dataclass
class GpgRevKeySig(GpgBaseError):
"""The signature with the keyid is good, but the signature was made by a revoked key."""
keyid: str
username: str
@frozen_dataclass
class GpgBadSig(GpgBaseError):
"""The signature with the keyid has not been verified okay."""
keyid: str
username: str
@frozen_dataclass
class GpgErrSig(GpgBaseError):
""""It was not possible to check the signature. This may be caused by
a missing public key or an unsupported algorithm. A RC of 4
indicates unknown algorithm, a 9 indicates a missing public
key.
"""
keyid: str
pkalgo: int
hashalgo: int
sig_class: str
time: int
rc: int
fpr: str
@frozen_dataclass
class GpgNoPubkey(GpgBaseError):
"""The public key is not available."""
keyid: str
@frozen_dataclass
class GpgMissingPassPhrase(GpgBaseError):
"""No passphrase was supplied."""
@frozen_dataclass
class GpgBadPassphrase(GpgBaseError):
"""The supplied passphrase was wrong or not given."""
keyid: str
@frozen_dataclass
class GpgNoData(GpgBaseError):
"""No data has been found. Codes for WHAT are:
- 1 :: No armored data.
- 2 :: Expected a packet but did not find one.
- 3 :: Invalid packet found, this may indicate a non OpenPGP
message.
- 4 :: Signature expected but not found.
"""
what: str
@frozen_dataclass
class GpgUnexpected(GpgBaseError):
"""No data has been found. Codes for WHAT are:
- 1 :: No armored data.
- 2 :: Expected a packet but did not find one.
- 3 :: Invalid packet found, this may indicate a non OpenPGP
message.
- 4 :: Signature expected but not found.
"""
what: str
@frozen_dataclass
class GpgError(GpgBaseError):
"""This is a generic error status message, it might be followed by error location specific data."""
location: str
code: int
more: str = ""
@frozen_dataclass
class GpgFailure(GpgBaseError):
"""This is the counterpart to SUCCESS and used to indicate a program failure."""
location: str
code: int
@frozen_dataclass
class GpgBadArmor(GpgBaseError):
"""The ASCII armor is corrupted."""
@frozen_dataclass
class GpgKeyExpired(GpgBaseError):
"""The key has expired."""
timestamp: int
@frozen_dataclass
class GpgKeyRevoked(GpgBaseError):
"""The used key has been revoked by its owner."""
@frozen_dataclass
class GpgNoSecKey(GpgBaseError):
"""The secret key is not available."""
keyid: str
GPG_ERROR_MAP = {
'EXPSIG': GpgExpSig,
'EXPKEYSIG': GpgExpKeySig,
'REVKEYSIG': GpgRevKeySig,
'BADSIG': GpgBadSig,
'ERRSIG': GpgErrSig,
'NO_PUBKEY': GpgNoPubkey,
'MISSING_PASSPHRASE': GpgMissingPassPhrase,
'BAD_PASSPHRASE': GpgBadPassphrase,
'NODATA': GpgNoData,
'UNEXPECTED': GpgUnexpected,
'ERROR': GpgError,
'FAILURE': GpgFailure,
'BADARMOR': GpgBadArmor,
'KEYEXPIRED': GpgKeyExpired,
'KEYREVOKED': GpgKeyRevoked,
'NO_SECKEY': GpgNoSecKey,
}