You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

201 lines
5.6 KiB
Python

from __future__ import annotations
import dataclasses
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import logging
from typing import Generic, Optional, TypeVar
from ..models import MediaCollection, MediaElement
T = TypeVar("T")
class SuitableLevel(Enum):
NO = (False, False)
FALLBACK = (True, False)
ALWAYS = (True, True)
@property
def can_accept(self) -> bool:
return self.value[0]
@property
def accept_immediately(self) -> bool:
return self.value[1]
@staticmethod
def always_or_no(value: bool) -> SuitableLevel:
return SuitableLevel.ALWAYS if value else SuitableLevel.NO
@staticmethod
def always_or_fallback(value: bool) -> SuitableLevel:
return SuitableLevel.ALWAYS if value else SuitableLevel.FALLBACK
@staticmethod
def fallback_or_no(value: bool) -> SuitableLevel:
return SuitableLevel.FALLBACK if value else SuitableLevel.NO
class ExtractionError(Exception):
pass
@dataclass
class ExtractedDataLight:
object_uri: str
extractor_name: str
object_key: str
def create_media(self) -> MediaElement:
return MediaElement(
uri=self.object_uri,
extractor_name=self.extractor_name,
extractor_key=self.object_key,
)
def create_collection(self) -> MediaCollection:
return MediaCollection(
uri=self.object_uri,
extractor_name=self.extractor_name,
extractor_key=self.object_key,
)
@dataclass
class ExtractedData(ExtractedDataLight, Generic[T]):
data: T = dataclasses.field(default=None, repr=False, compare=False)
@property
def has_data(self) -> bool:
return self.data is not None
def load_media(self) -> Optional[MediaElement]:
return MediaElement.get(
extractor_name=self.extractor_name,
extractor_key=self.object_key,
)
def load_collection(self) -> Optional[MediaCollection]:
return MediaCollection.get(
extractor_name=self.extractor_name,
extractor_key=self.object_key,
)
@dataclass
class AuthorExtractedData(ExtractedDataLight):
author_name: str
@property
def is_valid(self) -> bool:
return len(list(v for _, v in self.__dict__.items() if v is None)) <= 0
E = TypeVar("E", MediaElement, MediaCollection)
class GeneralExtractor(Generic[E, T]):
name: str
def __init__(self, name: str):
self.name = name
# abstract (for media & collection base classes)
@staticmethod
def check_uri(uri: str) -> Optional[E]:
raise NotImplementedError()
def _create_object(self, data: ExtractedData[T]) -> E:
raise NotImplementedError()
def _load_object(self, data: ExtractedData[T]) -> E:
raise NotImplementedError()
# abstract (for specific extractor classes)
def uri_suitable(self, uri: str) -> SuitableLevel:
raise NotImplementedError()
def can_extract_offline(self, uri: str) -> bool:
return False
def _cache_expired(self, object: E) -> bool:
return False
def _extract_offline_only(self, uri: str) -> ExtractedData[T]:
raise NotImplementedError()
def _extract_online(self, uri: str) -> ExtractedData[T]:
raise NotImplementedError()
def _update_object_raw(self, object: E, data: T) -> None:
raise NotImplementedError()
def _update_hook(self, object: E, data: ExtractedData[T]) -> None:
return None
# defined
def _extract_offline(self, uri: str) -> ExtractedData[T]:
return (
self._extract_offline_only(uri)
if self.can_extract_offline(uri)
else self._extract_online(uri)
)
def _extract_required(self, data: ExtractedData[T]) -> ExtractedData[T]:
if data.has_data:
return data
return self._extract_online(data.object_uri)
def _update_object(self, object: E, data: ExtractedData[T]) -> E:
object.uri = data.object_uri
self._update_object_raw(object, data.data)
self._update_hook(object, data)
object.last_updated = datetime.now()
return object
def update_object(self, object: E, check_cache_expired: bool = True) -> E:
if (
object.was_extracted
and check_cache_expired
and not self._cache_expired(object)
):
logging.debug(
f"Skip info for element as already extracted and cache valid: {object.title!r}"
)
return object
data = self._extract_online(object.uri)
logging.debug(f"Updating info for media: {data!r}")
return self._update_object(object, data)
def inject_object(self, data: ExtractedData[T]) -> E:
object = self._load_object(data)
data = self._extract_required(data)
if not object:
logging.debug(f"Store info for object: {data!r}")
object = self._create_object(data)
return self._update_object(object, data)
def store_object(self, data: ExtractedData[T]) -> E:
object = self._load_object(data)
if object:
logging.debug(f"Found object already in database: {data!r}")
return object
data = self._extract_required(data)
logging.debug(f"Store info for object: {data!r}")
object = self._create_object(data)
return self._update_object(object, data)
def extract_and_store(self, uri: str) -> E:
object = self.check_uri(uri)
if object is not None:
return object
return self.store_object(self._extract_offline(uri))