diff --git a/README.md b/README.md index 72ca425..ad532d3 100644 --- a/README.md +++ b/README.md @@ -21,11 +21,11 @@ if you want to avoid use GitHub. - Verboses changes if requested - Can rewrite sources files if run as `root` - Does backup old sources files for easy restoring +- Supports mirror lists for allowing fallback to direct connection (see [`man apt-transport-mirror`](https://manpages.debian.org/buster/apt/apt-transport-mirror.1.en.html)) ### ToDo - Support https for approx -- Implement '--mirror' mode ## Usage @@ -51,7 +51,12 @@ If you want to approve these changes, run as `root`: ``` Now your system will use the approx cache. -The old entries are commented out. +The old entries can be found in the backup files. + +### Mirror Mode + +To use mirror mode, follow the same approach as above, +but append the flag `-m` or `--mirror-mode` to the arguments. ## Contribute diff --git a/redirect.py b/redirect.py index e509327..5a80e24 100755 --- a/redirect.py +++ b/redirect.py @@ -2,6 +2,8 @@ # Imports import argparse +from contextlib import contextmanager +import hashlib import os from pathlib import Path import re @@ -35,11 +37,12 @@ def verb(txt, add=0): if verbose_mode: print((" " * (verboseLevel * 4 + add)) + txt) -def verbLevel(num): +@contextmanager +def verbLevelContext(mul=1): global verboseLevel - verboseLevel += num - if verboseLevel < 0: - verboseLevel = 0 + verboseLevel += mul + yield None + verboseLevel -= mul def splitDomainPath(url): if "/" in url: @@ -73,12 +76,14 @@ def url_to_regex(url, multi_mapping=multi_mapping): url = removeProtocol(url) domain, path = splitDomainPath(url) # Check if domain is given in multi_mapping + is_multi_mapped = False for m in multi_mapping: if m.match(domain): # If given exchange with multi mapped regex url_pattern = m.sub(m.pattern, domain) + re.escape(path) + is_multi_mapped = True break - else: + if not is_multi_mapped: url_pattern = re.escape(url) # Prefix protocol again return r"https?://" + url_pattern @@ -91,66 +96,83 @@ def modifyMap(d): def isDebLine(line): return lineReplaceRegex.search(line) is not None -def checkFile(file, map): +def checkFile(source_file, mapping): + global mirror_mode + global mirror_path global write_mode - verb(("Run replacements on" if write_mode else "Check") + f" {file}:") - verbLevel(1) - if write_mode: - newFile = tempfile.NamedTemporaryFile(mode="w", delete=False) - changed = False - for line in open(file, "r"): - line = line[:-1] - if isDebLine(line): - for old_url, new_url in map.items(): - if old_url.search(line): - changed = True - new_line = old_url.sub(new_url, line) - verb(f"{line}") - verb(f"-> {new_line}", add=-3) - line = new_line - break - else: - verb(f"= {line}", add=-2) + verb(("Run replacements on" if write_mode else "Check") + f" {source_file}:") + with verbLevelContext(): + origFilePath = Path(source_file) if write_mode: - newFile.write(line + "\n") - if write_mode: - origFilePath = Path(file) - backFilePath = Path(file + ".save") - newFilePath = Path(newFile.name) - newFile.close() - newFilePath.chmod(0o644) - if changed: - origFilePath.replace(backFilePath) - newFilePath.rename(origFilePath) - verbLevel(-1) + newFile = tempfile.NamedTemporaryFile(mode="w", delete=False) + backFilePath = Path(source_file + ".save") + newFilePath = Path(newFile.name) + changed = False + with origFilePath.open() as f: + for line in f: + line = line[:-1] # Remove newline character + if isDebLine(line): + for old_pattern, new_url in mapping.items(): + old_match = old_pattern.search(line) + if old_match: + changed = True + old_url = old_match.group(0) + verb(f"{line}") + if mirror_mode: + mirror_file = mirror_path / (hashlib.sha256(old_pattern.pattern) + ".lst") + new_line = old_pattern.sub('mirror+file:' + str(mirror_file.resolve()), line) + if write_mode: + if not mirror_path.exists(): + mirror_path.mkdir() + if not mirror_file.exists(): + mirror_file.write_text(f"{new_url} priority:1") + mirror_file.write_text(f"{old_url} priority:9") + else: + new_line = old_pattern.sub(new_url, line) + verb(f"-> {new_line} # {new_url}", add=-3) + line = new_line + break + else: + verb(f"= {line}", add=-2) + if write_mode: + newFile.write(line + "\n") + if write_mode and changed: + newFile.close() + newFilePath.chmod(0o644) + if changed: + origFilePath.replace(backFilePath) + newFilePath.rename(origFilePath) + else: + newFile.unlink() def main(argv): global mirror_mode + global mirror_path global verbose_mode global write_mode # Parse arguments parser = argparse.ArgumentParser(description="Redirects apt sources to a given approx cache if cached by approx. Only files ending with .list in sources.list.d will be changed.") parser.add_argument('host', help="The URL of the approx cache, uses http:// if protocol is omitted") - parser.add_argument('-c', '--confirm', action='store_true', dest='confirm', help="Does rewrite the source files to redirect to approx, does require run as root") - parser.add_argument('-m', '--mirror', action='store_true', dest='use_mirror', help="Uses mirror lists to allow falling back to direct connection") - parser.add_argument('-p', '--path', dest='path', default='/etc/apt', type=Path, help="Configuration directory of apt containing sources.list files, defaults to /etc/apt") - parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help="Displays debug information") + parser.add_argument('-c', '--confirm', action='store_true', help="Does rewrite the source files to redirect to approx, may does require run as root") + parser.add_argument('-m', '--mirror', action='store_true', help="Uses mirror lists to allow falling back to direct connection, mirror lists will be stored to {path}/mirror-lists") + parser.add_argument('-p', '--path', default='/etc/apt', type=Path, help="Configuration directory of apt containing sources.list files, defaults to /etc/apt") + parser.add_argument('-v', '--verbose', action='store_true', help="Displays debug information") args = parser.parse_args(argv) # Check server host if not allProtoRegex.match(args.host): args.host = "http://" + args.host # Store configuration in global vars - mirror_mode = args.use_mirror + mirror_mode = args.mirror + mirror_path = Path(args.path) / 'mirror-lists' verbose_mode = args.verbose write_mode = args.confirm # Retrieve repository map verb(f"Connect to {args.host} to retrieve repository list") repo_map = modifyMap(discoverMap(args.host)) verb("Found following repositories:") - verbLevel(1) - for k, v in repo_map.items(): - verb(k.pattern) - verbLevel(-1) + with verbLevelContext(): + for k, v in repo_map.items(): + verb(k.pattern) # Check for sources files file_list = [args.path / 'sources.list'] + [file for file in (args.path / 'sources.list.d').rglob('*.list') if file.is_file()] for file in file_list: