from __future__ import annotations import dataclasses from dataclasses import dataclass from datetime import datetime import logging from typing import Dict, Generic, Optional, TypeVar from ..models import MediaCollection, MediaElement T = TypeVar("T") class ExtractionError(Exception): pass @dataclass class ExtractedDataLight: object_uri: str extractor_name: str object_key: str def create_media(self) -> MediaElement: return MediaElement( uri = self.object_uri, extractor_name = self.extractor_name, extractor_key = self.object_key, ) def create_collection(self) -> MediaCollection: return MediaCollection( uri = self.object_uri, extractor_name = self.extractor_name, extractor_key = self.object_key ) @dataclass class ExtractedData(ExtractedDataLight, Generic[T]): data: T = dataclasses.field(default=None, repr=False, compare=False) cache: Dict = dataclasses.field(default=None, repr=False, compare=False) @property def has_data(self) -> bool: return self.data is not None def load_media(self) -> MediaElement: return MediaElement.get(extractor_name=self.extractor_name, extractor_key=self.object_key) def load_collection(self) -> MediaCollection: return MediaCollection.get(extractor_name=self.extractor_name, extractor_key=self.object_key) @dataclass class AuthorExtractedData(ExtractedDataLight): author_name: str @property def is_valid(self): return len(list(v for _, v in self.__dict__.items() if v is None)) <= 0 E = TypeVar("E", MediaElement, MediaCollection) class GeneralExtractor(Generic[E, T]): name: str def __init__(self, name: str): self.name = name # abstract (for media & collection base classes) @staticmethod def check_uri(uri: str) -> Optional[E]: raise NotImplementedError() def _create_object(self, data: ExtractedData[T]) -> E: raise NotImplementedError() def _load_object(self, data: ExtractedData[T]) -> E: raise NotImplementedError() # abstract (for specific extractor classes) #def uri_suitable(self, uri: str) -> bool: # raise NotImplementedError() def can_extract_offline(self, uri: str, cache: Dict = None) -> bool: return False def _cache_expired(self, date: datetime) -> bool: return False def _extract_offline_only(self, uri: str, cache: Dict = None) -> ExtractedData[T]: raise NotImplementedError() def _extract_online(self, uri: str, cache: Dict = None) -> ExtractedData[T]: raise NotImplementedError() def _update_object_raw(self, object: E, data: T): raise NotImplementedError() def _update_hook(self, object: E, data: ExtractedData[T]): return None # defined def _extract_offline(self, uri: str, cache: Dict = None) -> ExtractedData[T]: return self._extract_offline_only(uri, cache) if self.can_extract_offline(uri, cache) else self._extract_online(uri, cache) def _extract_required(self, data: ExtractedData[T]) -> ExtractedData[T]: if data.has_data: return data return self._extract_online(data.object_uri, data.cache) def _update_object(self, object: E, data: ExtractedData[T]) -> E: object.extractor_cache = data.cache object.uri = data.object_uri object.add_uris((data.object_uri,)) self._update_object_raw(object, data.data) self._update_hook(object, data) return object def update_object(self, object: E, check_cache_expired: bool = True) -> E: if object.extractor_cache_date and check_cache_expired and not self._cache_expired(object.extractor_cache_date): return object data = self._extract_online(object.uri, object.extractor_cache) logging.debug(f"Updating info for media: {data!r}") return self._update_object(object, data) def store_object(self, data: ExtractedData[T]) -> E: object = self._load_object(data) if object: logging.debug(f"Found object already in database: {data!r}") return object data = self._extract_required(data) logging.debug(f"Store info for object: {data!r}") object = self._create_object(data) return self._update_object(object, data) def extract_and_store(self, uri: str) -> E: object = self.check_uri(uri) if object is not None: return object return self.store_object(self._extract_offline(uri))