You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

330 lines
8.9 KiB
Python

#!/usr/bin/env python3
import argparse
import functools
import json
import os
from pathlib import Path
import subprocess
import sys
# constants
ACME_DOMAIN_PREFIX = "_acme-challenge"
SUBPROCESS_DEFAULT_KWARGS = {"shell": False}
SUBPROCESS_ENV = ["/usr/bin/env"]
# subprocess helpers
def safe_run(args, **kwargs):
return subprocess.run(args, **SUBPROCESS_DEFAULT_KWARGS, check=True, **kwargs)
def safe_call(args, **kwargs):
return safe_run(SUBPROCESS_ENV + args, **kwargs)
def safe_popen(args, subprocess_input, **kwargs):
proc = subprocess.Popen(SUBPROCESS_ENV + args, **SUBPROCESS_DEFAULT_KWARGS, stdin=subprocess.PIPE, **kwargs)
ret = proc.communicate(input=subprocess_input)
if proc.returncode != 0:
raise subprocess.CalledProcessError(returncode=proc.returncode, cmd=proc.args)
return (proc, *ret)
# shell replacements
def chmod(mode, *paths):
safe_call(["chmod", mode] + [str(path) for path in paths])
def concat(out_path, *in_paths, mode=0o333):
with open(out_path, mode="wb", opener=functools.partial(os.open, mode=mode)) as out_file:
for in_path in in_paths:
with in_path.open(mode="rb") as in_file:
out_file.write(in_file.read())
def iterdir_recursive(dir_path, suffix=""):
ret = []
for entry in dir_path.iterdir():
if entry.is_dir():
ret.extend(iterdir_recursive(entry, suffix=suffix))
elif entry.name.endswith(suffix):
ret.append(entry)
return ret
def nsupdate(key_path, record_name, record_ttl=60, record_type=None, record_data=None, delete_record=False):
if delete_record:
record_type = record_type or ""
record_data = record_data or ""
action = f"update delete {record_name} {record_ttl} IN {record_type} {record_data}"
else:
if not (record_name and record_ttl and record_type and record_data):
raise Exception("args missing")
action = f"update add {record_name} {record_ttl} IN {record_type} {record_data}"
safe_popen(["nsupdate", "-k", str(key_path)], f"{action}\nsend\n")
# hooks code
class DomainData:
def __init__(self, domain):
self.__domain = domain
@property
def domain(self):
return self.__domain
@property
def domain_dir(self):
return config.domains_directory / self.__domain
@property
def domain_data_file(self):
return self.domain_dir / config.domain_data_name
def run_hooks(self, hook_name, **kwargs):
hook_dir = self.domain_dir / hook_name
if not hook_dir.exists():
return
for hook in iterdir_recursive(hook_dir):
if os.access(hook, os.X_OK):
safe_run([str(hook)]) # TODO args
def deploy_challenge(domain, token_filename, token_value):
# token_filename only for http-01 challenges
challenge_name = f"{ACME_DOMAIN_PREFIX}.{domain}."
nsupdate(key_path=None, record_name=challenge_name, record_ttl=config.challenge_record_ttl, record_type="TXT", record_data=token_value)
def clean_challenge(domain, token_filename, token_value):
# token_filename only for http-01 challenges
challenge_name = f"{ACME_DOMAIN_PREFIX}.{domain}."
nsupdate(key_path=None, record_name=challenge_name, delete_record=True)
def sync_cert(domain, key_path, cert_path, fullchain_path, chain_path, request_path):
safe_call(["sync", key_path, cert_path, fullchain_path, chain_path, request_path])
def deploy_cert(domain, key_path, cert_path, fullchain_path, chain_path, timestamp: int):
# make public key files readable for all
chmod("+r", cert_path, fullchain_path, chain_path)
# create legacy key+cert+chain file
keyfullchain_path = key_path / ".." / config.keyfullchain_name
concat(keyfullchain_path, key_path, fullchain_path, mode=config.key_file_mode)
def deploy_ocsp(domain, ocsp_path, timestamp):
# make OCSP readable for all
chmod("+r", ocsp_path)
pass
def unchanged_cert(domain, key_path, cert_path, fullchain_path, chain_path):
pass
def invalid_challenge(domain, response):
pass
def request_failure(status_code, reason, req_type, headers):
pass
def generate_csr(domain, cert_dir, alt_names):
pass
def startup_hook():
# prepare domains list
concat(config.domains_main_file, *iterdir_recursive(config.domains_directory))
def exit_hook(error):
if error is None:
pass
else:
pass
# hooks metadata
arg_infos = {
"alt_names": {
"help": "All domain names for the current certificate as specified in domains.txt.",
},
"cert_path": {
"help": "The path of the file containing the signed certificate.",
"type": Path,
},
"chain_path": {
"help": "The path of the file containing the intermediate certificate(s).",
"type": Path,
},
"cert_dir": {
"help": "Certificate output directory for this particular certificate.",
"type": Path,
},
"domain": {
"help": "The domain name (CN or subject alternative name) being validated.",
},
"error": {
"help": "Contains error message if dehydrated exits with error.",
"nargs": "?",
},
"fullchain_path": {
"help": "The path of the file containing the signed full certificate chain.",
"type": Path,
},
"headers": {
"help": "HTTP headers returned by the CA",
},
"key_path": {
"help": "The path of the file containing the private key.",
"type": Path,
},
"ocsp_path": {
"help": "The path of the ocsp stapling file.",
"type": Path,
},
"req_type": {
"help": "The kind of request that was made (GET, POST, …).",
},
"reason": {
"help": "The specific reason for the error.",
},
"request_path": {
"help": "The path of the file containing the certificate signing request.",
"type": Path,
},
"response": {
"help": "The response that the verification server returned."
},
"status_code": {
"help": "The HTML status code that originated the error.",
"type": int,
},
"timestamp": {
"help": "Timestamp when the specified certificate was created.",
"type": int, # TODO to date
},
"token_filename": {
"help": "The name of the file containing the token to be served for HTTP validation.",
},
"token_value": {
"help": "The name of the file containing the token to be served for HTTP validation.",
},
}
hooks = {
"deploy_challenge": (deploy_challenge, [
"domain",
"token_filename",
"token_value",
]),
"clean_challenge": (clean_challenge, [
"domain",
"token_filename",
"token_value",
]),
"sync_cert": (sync_cert, [
"domain",
"key_path",
"cert_path",
"fullchain_path",
"chain_path",
"request_path",
]),
"deploy_cert": (deploy_cert, [
"domain",
"key_path",
"cert_path",
"fullchain_path",
"chain_path",
"timestamp",
]),
"deploy_ocsp": (deploy_ocsp, [
"domain",
"ocsp_path",
"timestamp",
]),
"unchanged_cert": (unchanged_cert, [
"domain",
"key_path",
"cert_path",
"fullchain_path",
"chain_path",
]),
"invalid_challenge": (invalid_challenge, [
"domain",
"response",
]),
"request_failure": (request_failure, [
"status_code",
"reason",
"req_type",
"headers",
]),
"generate_csr": (generate_csr, [
"domain",
"cert_dir",
"alt_names",
]),
"startup_hook": (startup_hook, [
]),
"exit_hook": (exit_hook, [
"error",
]),
}
# general
def read_config(config_path):
return json.loads(config_path.read_text())
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", "--config-file", dest='_config', type=Path, help="Path to config file")
subparsers = parser.add_subparsers(dest='_hook', required=True)
for hook_name, hook_data in hooks.items():
hook_fun, hook_args = hook_data
hook_parser = subparsers.add_parser(
hook_name,
prefix_chars='+',
)
hook_parser.set_defaults(
_func=hook_fun,
_parser=hook_parser,
)
for arg_name in hook_args:
hook_parser.add_argument(
arg_name,
**arg_infos[arg_name],
)
args = parser.parse_args()
return (args, {key: val for key, val in args.__dict__.items() if not key.startswith("_")})
def external_hook(hook_name, domain, **kwargs):
domain_dir = config.domains_directory / domain
print(domain)
def main(argv):
global config
args, fun_args = parse_args()
#config = read_config(args._config)
#args._fun(**fun_args)
external_hook(args._hook, **fun_args)
if __name__ == "__main__":
main(sys.argv[1:])