You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
6.0 KiB
Python

#!/usr/bin/env python3
from functools import partialmethod, wraps
import gzip
import hashlib
from io import BufferedReader
import json
from pathlib import Path
import time
# tries to follow XDG Base Directory Specification, if it fails due to a missing module, uses default cache dir ~/.cache
try:
import xdg
CACHE_DIR = xdg.xdg_cache_home()
except ModuleNotFoundError:
CACHE_DIR = Path.home() / ".cache"
GZIP_MAGIC_NUMBER = b'\x1f\x8b'
IDENTITY = lambda x: x
class ApplicationCache():
app_name: str
compress_data: bool
compress_threshold: int
default_max_age: int
encoding: str
def __init__(
self,
app_name: str,
compress_data: bool = True,
compress_threshold: int = 1024,
create_cache_dir: bool = True,
default_max_age: int = 3600,
encoding: str = "utf-8",
):
self.app_name = app_name
self.compress_data = compress_data
self.compress_threshold = compress_threshold
self.default_max_age = default_max_age
self.encoding = encoding
if create_cache_dir and not self.cache_dir.exists():
self.cache_dir.mkdir(parents=True)
if not self.cache_dir.is_dir():
raise Exception(f'Expected "{self.cache_dir}" to be a directory')
@property
def cache_dir(self) -> Path:
return CACHE_DIR / self.app_name
@staticmethod
def get_hash(args: tuple, kwargs: dict, *arbitary) -> str:
val = (*arbitary, args, kwargs)
m = hashlib.sha3_512()
m.update(json.dumps(val).encode("utf-8"))
return m.hexdigest()
@classmethod
def gen_key(cls, cache_id: str, args: list, kwargs: dict) -> str:
return cls.get_hash(args, kwargs, cache_id)
def clean_cache(self, max_age: int = None) -> int:
max_age = max_age or self.default_max_age
cleaned_count = 0
for cache_path in self.cache_dir.iterdir():
if cache_path.is_file():
cache_stat = cache_path.stat()
if cache_stat.st_mtime + max_age <= time.time():
cache_path.unlink()
cleaned_count += 1
return cleaned_count
def compress(self, data: str) -> bytes:
bin_data = data.encode(self.encoding)
if self.compress_data and len(bin_data) > self.compress_threshold:
return gzip.compress(bin_data)
return bin_data
def decompress(self, compressed_data: bytes) -> str:
bin_data = compressed_data
if bin_data[:2] == GZIP_MAGIC_NUMBER:
bin_data = gzip.decompress(bin_data)
return bin_data.decode(self.encoding)
def load(self, key: str, max_age: int = None) -> str:
max_age: int = max_age or self.default_max_age
cache_path: Path = self.cache_dir / key
if not cache_path.is_file():
if cache_path.exists():
cache_path.unlink()
return None
cache_stat = cache_path.stat()
if cache_stat.st_mtime + max_age <= time.time():
cache_path.unlink()
return None
with cache_path.open("rb") as f:
return self.decompress(f.read())
def store(self, key: str, data: str):
cache_path: Path = self.cache_dir / key
with cache_path.open("wb") as f:
f.write(self.compress(data))
def cache_anything(self, key_prefix: str = None, max_age: int = None, packer = IDENTITY, unpacker = IDENTITY):
def decorator(fun):
cache_prefix = key_prefix or fun.__name__
@wraps(fun)
def decorated(*args, no_cache: bool = False, cache_no_lookup: bool = False, cache_no_store: bool = False, **kwargs):
cache_no_lookup = no_cache or cache_no_lookup
cache_no_store = no_cache or cache_no_store
no_cache_key = cache_no_lookup and cache_no_store
if not no_cache_key:
cache_key = self.gen_key(cache_prefix, args, kwargs)
if not cache_no_lookup:
cached_data = self.load(key=cache_key, max_age=max_age)
if cached_data is not None:
return unpacker(cached_data)
data = fun(*args, **kwargs)
if not cache_no_store and data is not None:
self.store(key=cache_key, data=packer(data))
return data
return decorated
return decorator
def cache_anything_async(self, cache_id: str = None, max_age: int = None, packer = IDENTITY, unpacker = IDENTITY):
def decorator(fun):
key_prefix = cache_id or fun.__name__
@wraps(fun)
async def decorated(*args, no_cache: bool = False, cache_no_lookup: bool = False, cache_no_store: bool = False, **kwargs):
cache_no_lookup = no_cache or cache_no_lookup
cache_no_store = no_cache or cache_no_store
no_cache_key = cache_no_lookup and cache_no_store
if not no_cache_key:
cache_key = self.gen_key(key_prefix, args, kwargs)
if not cache_no_lookup:
cached_data = self.load(key=cache_key, max_age=max_age)
if cached_data is not None:
return unpacker(cached_data)
data = await fun(*args, **kwargs)
if not cache_no_store:
self.store(key=cache_key, data=packer(data))
return data
return decorated
return decorator
cache_int = partialmethod(cache_anything, packer=str, unpacker=int)
cache_json = partialmethod(cache_anything, packer=json.dumps, unpacker=json.loads)
cache_str = partialmethod(cache_anything, packer=str, unpacker=str)
cache_int_async = partialmethod(cache_anything_async, packer=str, unpacker=int)
cache_json_async = partialmethod(cache_anything_async, packer=json.dumps, unpacker=json.loads)
cache_str_async = partialmethod(cache_anything_async, packer=str, unpacker=str)