You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
4.7 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta
import logging
import math
from typing import (
Any,
Callable,
Mapping,
Optional,
Set,
TypeVar,
)
from pony import orm
from ...models import (
CollectionUriMapping,
MediaCollection,
MediaCollectionLink,
MediaElement,
)
from ..generic import (
ExtractedDataOnline,
ExtractedDataOffline,
ExtractionError,
GeneralExtractor,
)
T = TypeVar("T")
class CollectionExtractor(GeneralExtractor[MediaCollection, T]):
@staticmethod
def check_uri(uri: str) -> Optional[MediaCollection]:
mapping: CollectionUriMapping = CollectionUriMapping.get(uri=uri)
if mapping:
return mapping.element
return None
@staticmethod
def _calculate_wait_hours(
last_release_date: datetime,
growth_rate: float = 3.25508, # estimated for approriate cache timeout times (every 12 hours for 10 days old playlist)
) -> timedelta:
days_since = max((datetime.now() - last_release_date) // timedelta(days=1), 1)
wait_units = math.log(days_since, growth_rate)
wait_hours = (wait_units + 1) * 4
return timedelta(hours=wait_hours)
def __configure_collection(self, collection: MediaCollection) -> None:
collection.keep_updated = True
def _create_object(self, data: ExtractedDataOffline[T]) -> MediaCollection:
collection = data.create_collection()
self.__configure_collection(collection)
return collection
def _load_object(self, data: ExtractedDataOffline[T]) -> Optional[MediaCollection]:
collection = data.load_collection()
if collection is not None:
self.__configure_collection(collection)
return collection
def _add_episode(
self,
collection: MediaCollection,
uri: str,
season: int = 0,
episode: int = 0,
) -> Optional[MediaElement]:
# to avoid circular dependency
# sadly do not know where
from ..media import media_extract_uri
try:
element = media_extract_uri(uri)
except ExtractionError:
logging.warning(f"Failed while extracting media {uri!r}", exc_info=True)
return None
link = collection.add_episode(
media=element,
season=season,
episode=episode,
)
if link is not None:
logging.debug(
f"Add to collection {collection.title!r} media {uri!r} (Season {season}, Episode {episode})"
)
return element
def _inject_episode(
self,
collection: MediaCollection,
data: ExtractedDataOnline[Any],
season: int = 0,
episode: int = 0,
) -> Optional[MediaElement]:
from ..media import media_expect_extractor
extractor = media_expect_extractor(data.object_uri)
if data.extractor_name != extractor.name:
raise Exception(
f"Expected extractor {data.extractor_name!r} for uri {data.object_uri!r}, instead got {extractor.name!r}"
)
try:
element = extractor.inject_object(data)
except ExtractionError:
logging.warning(
f"Failed while extracting media {data.object_uri!r} while injecting from {collection.primary_uri!r}",
exc_info=True,
)
return None
link = collection.add_episode(
media=element,
season=season,
episode=episode,
)
if link:
logging.debug(
f"Add to collection {collection.title!r} media {data.object_uri!r} (Season {season}, Episode {episode})"
)
return element
def _remove_older_episodes(
self,
collection: MediaCollection,
current_set: Set[MediaElement],
) -> None:
all_set = {link.element for link in collection.media_links}
missing_set = all_set - current_set
for elem in missing_set:
if not elem.skip_over:
elem.delete()
def _sort_episodes(self, coll: MediaCollection) -> None:
sorting_methods: Mapping[int, Callable[[MediaCollectionLink], Any]] = {
1: lambda l: l.element.release_date,
}
method = sorting_methods.get(coll.sorting_method)
if method is None:
return
logging.debug(f"Sort collection by type {coll.sorting_method}")
for index, link in enumerate(
orm.select(l for l in coll.media_links).order_by(method)
):
link.season = 0
link.episode = index + 1
def _update_hook(
self, object: MediaCollection, data: ExtractedDataOnline[T]
) -> None:
self._sort_episodes(object)