#!/usr/bin/env python3
# Imports
import argparse
from contextlib import contextmanager
import hashlib
import os
from pathlib import Path
import re
import requests
import sys
import tempfile
## Variables
# Mirrors with multiple addresses (like per country)
multi_mapping = [
r " (ftp[0-9]* \ .[a-z]+|deb) \ .debian \ .org " ,
r " [a-z]+ \ .(archive|releases) \ .ubuntu \ .com " ,
]
# Regexs used
mapRegex = re . compile ( r " ^<tr><td><a href= \" (?P<new>[^ \" ]+) \" >[^<>]+</a></td><td><a href= \" (?P<old>.*) \" >http[^<>]*</a></td></tr>$ " )
allProtoRegex = re . compile ( r " ^[a-z+]+:/*(?!/) " )
protoReplaceRegex = re . compile ( r " ^https?:// " )
lineReplaceRegex = re . compile ( r " ^ \ s*deb(-src)? " )
lineUrlRegex = re . compile ( r " (?<= \ s)[a-z+]+:/*(?!/) \ S+(?! \ S) " )
## Code
multi_mapping = [ re . compile ( x ) for x in multi_mapping ]
verboseLevel = 0
def verb ( txt , add = 0 ) :
global verboseLevel
global verbose_mode
if verbose_mode :
print ( ( " " * ( verboseLevel * 4 + add ) ) + txt )
@contextmanager
def verbLevelContext ( mul = 1 ) :
global verboseLevel
verboseLevel + = mul
yield None
verboseLevel - = mul
def splitDomainPath ( url ) :
if " / " in url :
domain , path = url . split ( ' / ' , 1 )
return domain , ' / ' + path
else :
return url , ' '
def removeProtocol ( url ) :
return allProtoRegex . sub ( " " , url )
def remove_slash_suffix ( url ) :
# Removes slash suffix so that also urls in sources without suffix will be matched
return url [ : - 1 ] if url . endswith ( ' / ' ) else url
def discoverMap ( server ) :
global mapRegex
# Request approx server
res = requests . get ( server )
# Extract url_map
url_map = { }
for line in res . text . split ( ' \n ' ) :
match = mapRegex . match ( line )
if match :
url_map [ remove_slash_suffix ( match . group ( ' old ' ) ) ] = server + ' / ' + remove_slash_suffix ( match . group ( ' new ' ) )
return url_map
def url_to_regex ( url , multi_mapping = multi_mapping ) :
# Check if old path is prepend by https?://
if protoReplaceRegex . match ( url ) :
url = removeProtocol ( url )
domain , path = splitDomainPath ( url )
# Check if domain is given in multi_mapping
is_multi_mapped = False
for m in multi_mapping :
if m . match ( domain ) :
# If given exchange with multi mapped regex
url_pattern = m . sub ( m . pattern , domain ) + re . escape ( path )
is_multi_mapped = True
break
if not is_multi_mapped :
url_pattern = re . escape ( url )
# Prefix protocol again
return r " https?:// " + url_pattern
else :
return re . escape ( url )
def modifyMap ( d ) :
return { re . compile ( url_to_regex ( old ) ) : new for old , new in d . items ( ) }
def isDebLine ( line ) :
return lineReplaceRegex . search ( line ) is not None
def check_line ( line , mapping ) :
global mirror_mode
global mirror_path
global write_mode
for old_pattern , new_url in mapping . items ( ) :
old_match = old_pattern . search ( line )
if old_match :
changed = True
verb ( f " { line } " )
if mirror_mode :
old_prefix = old_match . group ( 0 )
old_url_match = lineUrlRegex . search ( line )
if not old_url_match :
raise Exception ( f " URL ' { old_prefix } ' not found in line ' { line } ' , report this to the developer " )
old_url = old_url_match . group ( 0 )
old_suffix = old_url [ len ( old_prefix ) : ]
if ( old_prefix + old_suffix ) != old_url :
raise Exception ( f " URL ' { old_prefix } ' not matching in line ' { line } ' , report this to the developer " )
mirror_file = mirror_path / ( hashlib . sha256 ( old_url . encode ( ' utf-8 ' ) ) . hexdigest ( ) + " .lst " )
if write_mode :
if not mirror_path . exists ( ) :
mirror_path . mkdir ( )
if not mirror_file . exists ( ) :
mirror_file . write_text ( f " { new_url } { old_suffix } \t priority:1 \n { old_url } \t priority:9 \n " )
new_line = lineUrlRegex . sub ( ' mirror+file: ' + str ( mirror_file . resolve ( ) ) , line )
else :
new_line = old_pattern . sub ( new_url , line )
verb ( f " -> { new_line } # { new_url } { old_suffix } " , add = - 3 )
return new_line
verb ( f " = { line } " , add = - 2 )
return None
def checkFile ( source_file , mapping ) :
global write_mode
verb ( ( " Run replacements on " if write_mode else " Check " ) + f " { source_file } : " )
with verbLevelContext ( ) :
origFilePath = Path ( source_file )
if write_mode :
newFile = tempfile . NamedTemporaryFile ( mode = " w " , delete = False )
backFilePath = Path ( source_file + " .save " )
newFilePath = Path ( newFile . name )
changed = False
with origFilePath . open ( ) as f :
for line in f :
line = line [ : - 1 ] # Remove newline character
if isDebLine ( line ) :
new_line = check_line ( line , mapping )
if new_line :
changed = True
line = new_line
if write_mode :
newFile . write ( line + " \n " )
if write_mode and changed :
newFile . close ( )
newFilePath . chmod ( 0o644 )
if changed :
origFilePath . replace ( backFilePath )
newFilePath . rename ( origFilePath )
else :
newFile . unlink ( )
def main ( argv ) :
global mirror_mode
global mirror_path
global verbose_mode
global write_mode
# Parse arguments
parser = argparse . ArgumentParser ( description = " Redirects apt sources to a given approx cache if cached by approx. Only files ending with .list in sources.list.d will be changed. " )
parser . add_argument ( ' host ' , help = " The URL of the approx cache, uses http:// if protocol is omitted " )
parser . add_argument ( ' -c ' , ' --confirm ' , action = ' store_true ' , help = " Does rewrite the source files to redirect to approx, may does require run as root " )
parser . add_argument ( ' -m ' , ' --mirror ' , action = ' store_true ' , help = " Uses mirror lists to allow falling back to direct connection, mirror lists will be stored to {path} /mirror-lists " )
parser . add_argument ( ' -p ' , ' --path ' , default = ' /etc/apt ' , type = Path , help = " Configuration directory of apt containing sources.list files, defaults to /etc/apt " )
parser . add_argument ( ' -v ' , ' --verbose ' , action = ' store_true ' , help = " Displays debug information " )
args = parser . parse_args ( argv )
# Check server host
if not allProtoRegex . match ( args . host ) :
args . host = " http:// " + args . host
# Store configuration in global vars
mirror_mode = args . mirror
mirror_path = Path ( args . path ) / ' mirror-lists '
verbose_mode = args . verbose
write_mode = args . confirm
# Retrieve repository map
verb ( f " Connect to { args . host } to retrieve repository list " )
repo_map = modifyMap ( discoverMap ( args . host ) )
verb ( " Found following repositories: " )
with verbLevelContext ( ) :
for k , v in repo_map . items ( ) :
verb ( k . pattern )
# Check for sources files
file_list = [ args . path / ' sources.list ' ] + [ file for file in ( args . path / ' sources.list.d ' ) . rglob ( ' *.list ' ) if file . is_file ( ) ]
for file in file_list :
if file . exists ( ) :
checkFile ( str ( file . resolve ( ) ) , repo_map )
if __name__ == ' __main__ ' :
main ( sys . argv [ 1 : ] )