You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

197 lines
7.2 KiB
Python

#!/usr/bin/env python3
# Imports
import argparse
from contextlib import contextmanager
import hashlib
import os
from pathlib import Path
import re
import requests
import sys
import tempfile
## Variables
# Mirrors with multiple addresses (like per country)
multi_mapping = [
r"(ftp[0-9]*\.[a-z]+|deb)\.debian\.org",
r"[a-z]+\.(archive|releases)\.ubuntu\.com",
]
# Regexs used
mapRegex = re.compile(r"^<tr><td><a href=\"(?P<new>[^\"]+)\">[^<>]+</a></td><td><a href=\"(?P<old>.*)\">http[^<>]*</a></td></tr>$")
allProtoRegex = re.compile(r"^[a-z+]+:/*(?!/)")
protoReplaceRegex = re.compile(r"^https?://")
lineReplaceRegex = re.compile(r"^\s*deb(-src)? ")
lineUrlRegex = re.compile(r"(?<=\s)[a-z+]+:/*(?!/)\S+(?!\S)")
## Code
multi_mapping = [re.compile(x) for x in multi_mapping]
verboseLevel = 0
def verb(txt, add=0):
global verboseLevel
global verbose_mode
if verbose_mode:
print((" " * (verboseLevel * 4 + add)) + txt)
@contextmanager
def verbLevelContext(mul=1):
global verboseLevel
verboseLevel += mul
yield None
verboseLevel -= mul
def splitDomainPath(url):
if "/" in url:
domain, path = url.split('/', 1)
return domain, '/' + path
else:
return url, ''
def removeProtocol(url):
return allProtoRegex.sub("", url)
def remove_slash_suffix(url):
# Removes slash suffix so that also urls in sources without suffix will be matched
return url[:-1] if url.endswith('/') else url
def discoverMap(server):
global mapRegex
# Request approx server
res = requests.get(server)
# Extract url_map
url_map = {}
for line in res.text.split('\n'):
match = mapRegex.match(line)
if match:
url_map[remove_slash_suffix(match.group('old'))] = server + '/' + remove_slash_suffix(match.group('new'))
return url_map
def url_to_regex(url, multi_mapping=multi_mapping):
# Check if old path is prepend by https?://
if protoReplaceRegex.match(url):
url = removeProtocol(url)
domain, path = splitDomainPath(url)
# Check if domain is given in multi_mapping
is_multi_mapped = False
for m in multi_mapping:
if m.match(domain):
# If given exchange with multi mapped regex
url_pattern = m.sub(m.pattern, domain) + re.escape(path)
is_multi_mapped = True
break
if not is_multi_mapped:
url_pattern = re.escape(url)
# Prefix protocol again
return r"https?://" + url_pattern
else:
return re.escape(url)
def modifyMap(d):
return {re.compile(url_to_regex(old)): new for old, new in d.items()}
def isDebLine(line):
return lineReplaceRegex.search(line) is not None
def check_line(line, mapping):
global mirror_mode
global mirror_path
global write_mode
for old_pattern, new_url in mapping.items():
old_match = old_pattern.search(line)
if old_match:
changed = True
verb(f"{line}")
if mirror_mode:
old_prefix = old_match.group(0)
old_url_match = lineUrlRegex.search(line)
if not old_url_match:
raise Exception(f"URL '{old_prefix}' not found in line '{line}', report this to the developer")
old_url = old_url_match.group(0)
old_suffix = old_url[len(old_prefix):]
if (old_prefix + old_suffix) != old_url:
raise Exception(f"URL '{old_prefix}' not matching in line '{line}', report this to the developer")
mirror_file = mirror_path / (hashlib.sha256(old_url.encode('utf-8')).hexdigest() + ".lst")
if write_mode:
if not mirror_path.exists():
mirror_path.mkdir()
if not mirror_file.exists():
mirror_file.write_text(f"{new_url}{old_suffix}\tpriority:1\n{old_url}\tpriority:9\n")
new_line = lineUrlRegex.sub('mirror+file:' + str(mirror_file.resolve()), line)
else:
new_line = old_pattern.sub(new_url, line)
verb(f"-> {new_line} # {new_url}{old_suffix}", add=-3)
return new_line
verb(f"= {line}", add=-2)
return None
def checkFile(source_file, mapping):
global write_mode
verb(("Run replacements on" if write_mode else "Check") + f" {source_file}:")
with verbLevelContext():
origFilePath = Path(source_file)
if write_mode:
newFile = tempfile.NamedTemporaryFile(mode="w", delete=False)
backFilePath = Path(source_file + ".save")
newFilePath = Path(newFile.name)
changed = False
with origFilePath.open() as f:
for line in f:
line = line[:-1] # Remove newline character
if isDebLine(line):
new_line = check_line(line, mapping)
if new_line:
changed = True
line = new_line
if write_mode:
newFile.write(line + "\n")
if write_mode and changed:
newFile.close()
newFilePath.chmod(0o644)
if changed:
origFilePath.replace(backFilePath)
newFilePath.rename(origFilePath)
else:
newFile.unlink()
def main(argv):
global mirror_mode
global mirror_path
global verbose_mode
global write_mode
# Parse arguments
parser = argparse.ArgumentParser(description="Redirects apt sources to a given approx cache if cached by approx. Only files ending with .list in sources.list.d will be changed.")
parser.add_argument('host', help="The URL of the approx cache, uses http:// if protocol is omitted")
parser.add_argument('-c', '--confirm', action='store_true', help="Does rewrite the source files to redirect to approx, may does require run as root")
parser.add_argument('-m', '--mirror', action='store_true', help="Uses mirror lists to allow falling back to direct connection, mirror lists will be stored to {path}/mirror-lists")
parser.add_argument('-p', '--path', default='/etc/apt', type=Path, help="Configuration directory of apt containing sources.list files, defaults to /etc/apt")
parser.add_argument('-v', '--verbose', action='store_true', help="Displays debug information")
args = parser.parse_args(argv)
# Check server host
if not allProtoRegex.match(args.host):
args.host = "http://" + args.host
# Store configuration in global vars
mirror_mode = args.mirror
mirror_path = Path(args.path) / 'mirror-lists'
verbose_mode = args.verbose
write_mode = args.confirm
# Retrieve repository map
verb(f"Connect to {args.host} to retrieve repository list")
repo_map = modifyMap(discoverMap(args.host))
verb("Found following repositories:")
with verbLevelContext():
for k, v in repo_map.items():
verb(k.pattern)
# Check for sources files
file_list = [args.path / 'sources.list'] + [file for file in (args.path / 'sources.list.d').rglob('*.list') if file.is_file()]
for file in file_list:
if file.exists():
checkFile(str(file.resolve()), repo_map)
if __name__ == '__main__':
main(sys.argv[1:])