#!/usr/bin/env python3 from functools import partialmethod, wraps import gzip import hashlib from io import BufferedReader import json from pathlib import Path import time # tries to follow XDG Base Directory Specification, if it fails due to a missing module, uses default cache dir ~/.cache try: # pip package: pyxdg import xdg.BaseDirectory CACHE_DIR = Path(xdg.BaseDirectory.xdg_cache_home) except ModuleNotFoundError: try: # pip package: xdg import xdg CACHE_DIR = xdg.xdg_cache_home() except ModuleNotFoundError: CACHE_DIR = Path.home() / ".cache" GZIP_MAGIC_NUMBER = b'\x1f\x8b' IDENTITY = lambda x: x class ApplicationCache(): app_name: str compress_data: bool compress_threshold: int default_max_age: int encoding: str def __init__( self, app_name: str, compress_data: bool = True, compress_threshold: int = 1024, create_cache_dir: bool = True, default_max_age: int = 3600, encoding: str = "utf-8", ): self.app_name = app_name self.compress_data = compress_data self.compress_threshold = compress_threshold self.default_max_age = default_max_age self.encoding = encoding if create_cache_dir and not self.cache_dir.exists(): self.cache_dir.mkdir(parents=True) if not self.cache_dir.is_dir(): raise Exception(f'Expected "{self.cache_dir}" to be a directory') @property def cache_dir(self) -> Path: return CACHE_DIR / self.app_name @staticmethod def get_hash(args: tuple, kwargs: dict, *arbitary) -> str: val = (*arbitary, args, kwargs) m = hashlib.sha3_512() m.update(json.dumps(val).encode("utf-8")) return m.hexdigest() @classmethod def gen_key(cls, cache_id: str, args: list, kwargs: dict) -> str: return cls.get_hash(args, kwargs, cache_id) def clean_cache(self, max_age: int = None) -> int: max_age = max_age or self.default_max_age cleaned_count = 0 for cache_path in self.cache_dir.iterdir(): if cache_path.is_file(): cache_stat = cache_path.stat() if cache_stat.st_mtime + max_age <= time.time(): cache_path.unlink() cleaned_count += 1 return cleaned_count def compress(self, data: str) -> bytes: bin_data = data.encode(self.encoding) if self.compress_data and len(bin_data) > self.compress_threshold: return gzip.compress(bin_data) return bin_data def decompress(self, compressed_data: bytes) -> str: bin_data = compressed_data if bin_data[:2] == GZIP_MAGIC_NUMBER: bin_data = gzip.decompress(bin_data) return bin_data.decode(self.encoding) def load(self, key: str, max_age: int = None) -> str: max_age: int = max_age or self.default_max_age cache_path: Path = self.cache_dir / key if not cache_path.is_file(): if cache_path.exists(): cache_path.unlink() return None cache_stat = cache_path.stat() if cache_stat.st_mtime + max_age <= time.time(): cache_path.unlink() return None with cache_path.open("rb") as f: return self.decompress(f.read()) def store(self, key: str, data: str): cache_path: Path = self.cache_dir / key with cache_path.open("wb") as f: f.write(self.compress(data)) def cache_anything(self, key_prefix: str = None, max_age: int = None, packer = IDENTITY, unpacker = IDENTITY): def decorator(fun): cache_prefix = key_prefix or fun.__name__ @wraps(fun) def decorated(*args, no_cache: bool = False, cache_no_lookup: bool = False, cache_no_store: bool = False, **kwargs): cache_no_lookup = no_cache or cache_no_lookup cache_no_store = no_cache or cache_no_store no_cache_key = cache_no_lookup and cache_no_store if not no_cache_key: cache_key = self.gen_key(cache_prefix, args, kwargs) if not cache_no_lookup: cached_data = self.load(key=cache_key, max_age=max_age) if cached_data is not None: return unpacker(cached_data) data = fun(*args, **kwargs) if not cache_no_store and data is not None: self.store(key=cache_key, data=packer(data)) return data return decorated return decorator def cache_anything_async(self, cache_id: str = None, max_age: int = None, packer = IDENTITY, unpacker = IDENTITY): def decorator(fun): key_prefix = cache_id or fun.__name__ @wraps(fun) async def decorated(*args, no_cache: bool = False, cache_no_lookup: bool = False, cache_no_store: bool = False, **kwargs): cache_no_lookup = no_cache or cache_no_lookup cache_no_store = no_cache or cache_no_store no_cache_key = cache_no_lookup and cache_no_store if not no_cache_key: cache_key = self.gen_key(key_prefix, args, kwargs) if not cache_no_lookup: cached_data = self.load(key=cache_key, max_age=max_age) if cached_data is not None: return unpacker(cached_data) data = await fun(*args, **kwargs) if not cache_no_store: self.store(key=cache_key, data=packer(data)) return data return decorated return decorator cache_int = partialmethod(cache_anything, packer=str, unpacker=int) cache_json = partialmethod(cache_anything, packer=json.dumps, unpacker=json.loads) cache_str = partialmethod(cache_anything, packer=str, unpacker=str) cache_int_async = partialmethod(cache_anything_async, packer=str, unpacker=int) cache_json_async = partialmethod(cache_anything_async, packer=json.dumps, unpacker=json.loads) cache_str_async = partialmethod(cache_anything_async, packer=str, unpacker=str)