Refracted code and added mirror feature

master
Felix Stupp 4 years ago
parent b87503eabf
commit 35509b59b1
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -21,11 +21,11 @@ if you want to avoid use GitHub.
- Verboses changes if requested
- Can rewrite sources files if run as `root`
- Does backup old sources files for easy restoring
- Supports mirror lists for allowing fallback to direct connection (see [`man apt-transport-mirror`](https://manpages.debian.org/buster/apt/apt-transport-mirror.1.en.html))
### ToDo
- Support https for approx
- Implement '--mirror' mode
## Usage
@ -51,7 +51,12 @@ If you want to approve these changes, run as `root`:
```
Now your system will use the approx cache.
The old entries are commented out.
The old entries can be found in the backup files.
### Mirror Mode
To use mirror mode, follow the same approach as above,
but append the flag `-m` or `--mirror-mode` to the arguments.
## Contribute

@ -2,6 +2,8 @@
# Imports
import argparse
from contextlib import contextmanager
import hashlib
import os
from pathlib import Path
import re
@ -35,11 +37,12 @@ def verb(txt, add=0):
if verbose_mode:
print((" " * (verboseLevel * 4 + add)) + txt)
def verbLevel(num):
@contextmanager
def verbLevelContext(mul=1):
global verboseLevel
verboseLevel += num
if verboseLevel < 0:
verboseLevel = 0
verboseLevel += mul
yield None
verboseLevel -= mul
def splitDomainPath(url):
if "/" in url:
@ -73,12 +76,14 @@ def url_to_regex(url, multi_mapping=multi_mapping):
url = removeProtocol(url)
domain, path = splitDomainPath(url)
# Check if domain is given in multi_mapping
is_multi_mapped = False
for m in multi_mapping:
if m.match(domain):
# If given exchange with multi mapped regex
url_pattern = m.sub(m.pattern, domain) + re.escape(path)
is_multi_mapped = True
break
else:
if not is_multi_mapped:
url_pattern = re.escape(url)
# Prefix protocol again
return r"https?://" + url_pattern
@ -91,66 +96,83 @@ def modifyMap(d):
def isDebLine(line):
return lineReplaceRegex.search(line) is not None
def checkFile(file, map):
def checkFile(source_file, mapping):
global mirror_mode
global mirror_path
global write_mode
verb(("Run replacements on" if write_mode else "Check") + f" {file}:")
verbLevel(1)
if write_mode:
newFile = tempfile.NamedTemporaryFile(mode="w", delete=False)
changed = False
for line in open(file, "r"):
line = line[:-1]
if isDebLine(line):
for old_url, new_url in map.items():
if old_url.search(line):
changed = True
new_line = old_url.sub(new_url, line)
verb(f"{line}")
verb(f"-> {new_line}", add=-3)
line = new_line
break
else:
verb(f"= {line}", add=-2)
verb(("Run replacements on" if write_mode else "Check") + f" {source_file}:")
with verbLevelContext():
origFilePath = Path(source_file)
if write_mode:
newFile.write(line + "\n")
if write_mode:
origFilePath = Path(file)
backFilePath = Path(file + ".save")
newFilePath = Path(newFile.name)
newFile.close()
newFilePath.chmod(0o644)
if changed:
origFilePath.replace(backFilePath)
newFilePath.rename(origFilePath)
verbLevel(-1)
newFile = tempfile.NamedTemporaryFile(mode="w", delete=False)
backFilePath = Path(source_file + ".save")
newFilePath = Path(newFile.name)
changed = False
with origFilePath.open() as f:
for line in f:
line = line[:-1] # Remove newline character
if isDebLine(line):
for old_pattern, new_url in mapping.items():
old_match = old_pattern.search(line)
if old_match:
changed = True
old_url = old_match.group(0)
verb(f"{line}")
if mirror_mode:
mirror_file = mirror_path / (hashlib.sha256(old_pattern.pattern) + ".lst")
new_line = old_pattern.sub('mirror+file:' + str(mirror_file.resolve()), line)
if write_mode:
if not mirror_path.exists():
mirror_path.mkdir()
if not mirror_file.exists():
mirror_file.write_text(f"{new_url} priority:1")
mirror_file.write_text(f"{old_url} priority:9")
else:
new_line = old_pattern.sub(new_url, line)
verb(f"-> {new_line} # {new_url}", add=-3)
line = new_line
break
else:
verb(f"= {line}", add=-2)
if write_mode:
newFile.write(line + "\n")
if write_mode and changed:
newFile.close()
newFilePath.chmod(0o644)
if changed:
origFilePath.replace(backFilePath)
newFilePath.rename(origFilePath)
else:
newFile.unlink()
def main(argv):
global mirror_mode
global mirror_path
global verbose_mode
global write_mode
# Parse arguments
parser = argparse.ArgumentParser(description="Redirects apt sources to a given approx cache if cached by approx. Only files ending with .list in sources.list.d will be changed.")
parser.add_argument('host', help="The URL of the approx cache, uses http:// if protocol is omitted")
parser.add_argument('-c', '--confirm', action='store_true', dest='confirm', help="Does rewrite the source files to redirect to approx, does require run as root")
parser.add_argument('-m', '--mirror', action='store_true', dest='use_mirror', help="Uses mirror lists to allow falling back to direct connection")
parser.add_argument('-p', '--path', dest='path', default='/etc/apt', type=Path, help="Configuration directory of apt containing sources.list files, defaults to /etc/apt")
parser.add_argument('-v', '--verbose', action='store_true', dest='verbose', help="Displays debug information")
parser.add_argument('-c', '--confirm', action='store_true', help="Does rewrite the source files to redirect to approx, may does require run as root")
parser.add_argument('-m', '--mirror', action='store_true', help="Uses mirror lists to allow falling back to direct connection, mirror lists will be stored to {path}/mirror-lists")
parser.add_argument('-p', '--path', default='/etc/apt', type=Path, help="Configuration directory of apt containing sources.list files, defaults to /etc/apt")
parser.add_argument('-v', '--verbose', action='store_true', help="Displays debug information")
args = parser.parse_args(argv)
# Check server host
if not allProtoRegex.match(args.host):
args.host = "http://" + args.host
# Store configuration in global vars
mirror_mode = args.use_mirror
mirror_mode = args.mirror
mirror_path = Path(args.path) / 'mirror-lists'
verbose_mode = args.verbose
write_mode = args.confirm
# Retrieve repository map
verb(f"Connect to {args.host} to retrieve repository list")
repo_map = modifyMap(discoverMap(args.host))
verb("Found following repositories:")
verbLevel(1)
for k, v in repo_map.items():
verb(k.pattern)
verbLevel(-1)
with verbLevelContext():
for k, v in repo_map.items():
verb(k.pattern)
# Check for sources files
file_list = [args.path / 'sources.list'] + [file for file in (args.path / 'sources.list.d').rglob('*.list') if file.is_file()]
for file in file_list:

Loading…
Cancel
Save