From 0d75fa72bd66f73b71b3d6133ddededb18de9129 Mon Sep 17 00:00:00 2001 From: Felix Stupp Date: Sun, 6 Nov 2022 09:41:59 +0100 Subject: [PATCH] Add TMDB extraction support --- .../extractors/all/tmdb.py | 320 ++++++++++++++++++ .../extractors/collection/__init__.py | 3 + .../extractors/collection/tmdb.py | 144 ++++++++ .../extractors/media/__init__.py | 2 + .../extractors/media/tmdb.py | 88 +++++ 5 files changed, 557 insertions(+) create mode 100644 server/entertainment_decider/extractors/all/tmdb.py create mode 100644 server/entertainment_decider/extractors/collection/tmdb.py create mode 100644 server/entertainment_decider/extractors/media/tmdb.py diff --git a/server/entertainment_decider/extractors/all/tmdb.py b/server/entertainment_decider/extractors/all/tmdb.py new file mode 100644 index 0000000..3a1b038 --- /dev/null +++ b/server/entertainment_decider/extractors/all/tmdb.py @@ -0,0 +1,320 @@ +from __future__ import annotations + +from datetime import datetime +from functools import cached_property +import itertools +import math +from typing import Literal, Optional, Sequence, TypedDict + +import tmdbsimple as tmdb + + +tmdb.API_KEY = "f090bb54758cabf231fb605d3e3e0468" + + +TMDB_DATE_FORMAT = "%Y-%m-%d" +TMDB_REGEX_URI = r""" + ( + https?://(www\.)?themoviedb\.org + | + tmdb:// + ) +""" + + +class TmdbGenreDict(TypedDict): + id: int + name: str + + +class TmdbImageDict(TypedDict): + aspect_ratio: float + file_path: str + height: int + vote_average: float + vote_count: int + width: int + + +class TmdbKeywordDict(TypedDict): + id: int + name: str + + +class TmdbExternalIdsDict(TypedDict): + facebook_id: Optional[str] + imdb_id: Optional[str] + instagram_id: Optional[str] + twitter_id: Optional[str] + + +class TmdbKeywordsDict(TypedDict): + keywords: Sequence[TmdbKeywordDict] + + +class TmdbMovieImagesDict(TypedDict): + backdrops: Sequence[TmdbImageDict] + posters: Sequence[TmdbImageDict] + + +class TmdbMovieEntryInfoDict(TypedDict): + adult: bool + backdrop_path: Optional[str] + genre_ids: Sequence[int] + id: int + original_language: str + original_title: str + overview: str + release_date: str + poster_path: str + popularity: int + title: str + video: bool + vote_average: float + vote_count: int + + +class TmdbCollectionInfoDict(TypedDict): + id: int + name: str + overview: str + poster_path: Optional[str] + backdrop_path: Optional[str] + parts: Sequence[TmdbMovieEntryInfoDict] + + +class TmdbMovieInfoDict(TypedDict): + external_ids: TmdbExternalIdsDict + genres: Sequence[TmdbGenreDict] + keywords: TmdbKeywordsDict + id: int + images: TmdbMovieImagesDict + overview: Optional[str] + release_date: Optional[str] + runtime: Optional[int] + status: Literal[ + "Rumored", "Planned", "In Production", "Post Production", "Released", "Canceled" + ] + title: str + + +class TmdbKeywordInfoDict(TypedDict): + id: int + name: str + movies: TmdbKeywordMoviesDict + + +class TmdbKeywordMoviesDict(TypedDict): + page: int + results: Sequence[TmdbMovieEntryInfoDict] + total_pages: int + total_results: int + + +def select_best_image(images: Sequence[TmdbImageDict]) -> Optional[TmdbImageDict]: + if len(images) <= 0: + return None + return max( + images, key=lambda i: i["vote_average"] * (math.log10(i["vote_count"] + 1)) + ) + + +class TmdbMovieEntryData: + def __init__(self, part_dict: TmdbMovieEntryInfoDict): + self._info = part_dict + + @cached_property + def release_date(self) -> Optional[datetime]: + date_str = self._info.get("release_date") + if not date_str: + return None + return datetime.strptime(date_str, TMDB_DATE_FORMAT) + + @property + def release_date_req(self) -> datetime: + date = self.release_date + if date is None: + raise Exception(f"Expected release_date to be set") + return date + + @cached_property + def was_released(self) -> bool: + return self.release_date is not None and self.release_date <= datetime.now() + + @cached_property + def tmdb_custom_uri(self) -> str: + return f"tmdb:///movie/{self.tmdb_id}" + + @cached_property + def tmdb_id(self) -> int: + return self._info["id"] + + +class TmdbCollectionData: + @classmethod + def from_id(cls, collection_id: int) -> TmdbCollectionData: + return cls(tmdb.Collections(collection_id)) + + def __init__(self, collection_obj: tmdb.Collections): + self.obj = collection_obj + + @cached_property + def _info(self) -> TmdbCollectionInfoDict: + return self.obj.info() # type: ignore + + @cached_property + def description(self) -> Optional[str]: + return self._info.get("overview") + + @cached_property + def parts(self) -> Sequence[TmdbMovieEntryData]: + return [TmdbMovieEntryData(part) for part in self._info["parts"]] + + @cached_property + def release_date(self) -> Optional[datetime]: + return min( + (part.release_date for part in self.parts if part.release_date is not None) + ) + + @cached_property + def title(self) -> str: + return self._info["name"] + + @cached_property + def tmdb_custom_uri(self) -> str: + return f"tmdb:///collection/{self.tmdb_id}" + + @cached_property + def tmdb_id(self) -> int: + return self._info["id"] + + +class TmdbKeywordData: + @classmethod + def from_id(cls, keyword_id: int) -> TmdbKeywordData: + return cls(tmdb.Keywords(keyword_id)) + + def __init__(self, keyword_obj: tmdb.Keywords): + self.obj = keyword_obj + + @cached_property + def _info(self) -> TmdbKeywordInfoDict: + return self.obj.info(append_to_response="movies") # type: ignore + + @cached_property + def description(self) -> Optional[str]: + return None + + @cached_property + def parts(self) -> Sequence[TmdbMovieEntryData]: + movies_first = self._info["movies"] + follow_pages: Sequence[TmdbKeywordMoviesDict] = [ + self.obj.movies(page=i) + for i in range(2, movies_first["total_pages"] + 1, 1) + ] + all_results = itertools.chain( + movies_first["results"], + (part for page in follow_pages for part in page["results"]), + ) + return [TmdbMovieEntryData(part) for part in all_results] + + @cached_property + def release_date(self) -> Optional[datetime]: + return min( + (part.release_date for part in self.parts if part.release_date is not None) + ) + + @cached_property + def title(self) -> str: + return self._info["name"] + + @cached_property + def tmdb_custom_uri(self) -> str: + return f"tmdb:///keyword/{self.tmdb_id}" + + @cached_property + def tmdb_id(self) -> int: + return self._info["id"] + + +class TmdbMovieData: + @classmethod + def from_id(cls, movie_id: int) -> TmdbMovieData: + return cls(tmdb.Movies(movie_id)) + + def __init__(self, movie_obj: tmdb.Movies): + self.obj = movie_obj + + @cached_property + def _info(self) -> TmdbMovieInfoDict: + return self.obj.info(append_to_response="external_ids,images,keywords") # type: ignore + + @cached_property + def description(self) -> Optional[str]: + return self._info.get("overview") + + @cached_property + def genres(self) -> Sequence[str]: + return [genre["name"] for genre in self._info["genres"]] + + @cached_property + def imdb_id(self) -> Optional[str]: + return self._info["external_ids"].get("imdb_id") + + @cached_property + def imdb_custom_uri(self) -> Optional[str]: + if self.imdb_id is None: + return None + return f"imdb:///{self.imdb_id}" + + @cached_property + def length(self) -> Optional[int]: + "Length of the movie in seconds" + runtime_min = self._info.get("runtime") + if runtime_min is None: + return None + return runtime_min * 60 + + @cached_property + def release_date(self) -> Optional[datetime]: + date_str = self._info.get("release_date") + if not date_str: + return None + return datetime.strptime(date_str, TMDB_DATE_FORMAT) + + @cached_property + def was_released(self) -> bool: + return ( + self.release_date is not None + and self.release_date <= datetime.now() + and self.length is not None + and self.length > 0 + ) + + @cached_property + def thumbnail_uri(self) -> Optional[str]: + all_images = self._info["images"] + image = select_best_image(all_images["backdrops"]) or select_best_image( + all_images["posters"] + ) + return ( + f"https://image.tmdb.org/t/p/original{image['file_path']}" + if image + else None + ) + + @cached_property + def title(self) -> str: + return self._info["title"] + + @cached_property + def tmdb_custom_uri(self) -> str: + return f"tmdb:///movie/{self.tmdb_id}" + + @cached_property + def tmdb_short_uri(self) -> str: + return f"https://www.themoviedb.org/movie/{self.tmdb_id}" + + @cached_property + def tmdb_id(self) -> int: + return self._info["id"] diff --git a/server/entertainment_decider/extractors/collection/__init__.py b/server/entertainment_decider/extractors/collection/__init__.py index b83adf1..dc806fe 100644 --- a/server/entertainment_decider/extractors/collection/__init__.py +++ b/server/entertainment_decider/extractors/collection/__init__.py @@ -8,6 +8,7 @@ from ...models import MediaCollection from ..helpers import expect_suitable_extractor from .base import CollectionExtractor from .tt_rss import TtRssCollectionExtractor, TtRssConnectionParameter +from .tmdb import TmdbCollectionExtractor, TmdbKeywordExtractor from .tvmaze import TvmazeCollectionExtractor from .youtube import YouTubeCollectionExtractor @@ -19,6 +20,8 @@ COLLECTION_EXTRACTORS: Dict[str, CollectionExtractor] = { label_filter=-1033, mark_as_read=True, ), + "tmdb": TmdbCollectionExtractor(), + "tmdb-keyword": TmdbKeywordExtractor(), "tvmaze": TvmazeCollectionExtractor(), "youtube": YouTubeCollectionExtractor(), } diff --git a/server/entertainment_decider/extractors/collection/tmdb.py b/server/entertainment_decider/extractors/collection/tmdb.py new file mode 100644 index 0000000..418fa81 --- /dev/null +++ b/server/entertainment_decider/extractors/collection/tmdb.py @@ -0,0 +1,144 @@ +from __future__ import annotations + +from datetime import datetime +import re +from typing import ClassVar, Iterable, List, Optional, TypeVar + +from pony import orm + +from ...models import MediaCollection +from ..all.tmdb import ( + TmdbCollectionData, + TMDB_REGEX_URI, + TmdbKeywordData, + TmdbMovieEntryData, +) +from ..generic import ExtractedData, ExtractedDataLight, SuitableLevel +from .base import CollectionExtractor + + +T = TypeVar("T") + + +class TmdbBaseExtractor(CollectionExtractor[T]): + + TMDB_CLASS: ClassVar[str] + + SUPPORTED_PATTERN = re.compile( + rf"""^ + {TMDB_REGEX_URI} + /(?P[a-z]+)/ + (?P\d+) + (?!\d) + (-[^/]+)? + (/movie)? + /? + $""", + re.VERBOSE, + ) + + @classmethod + def _get_id(cls, uri: str) -> Optional[int]: + m = cls.SUPPORTED_PATTERN.search(uri) + return int(m.group("id")) if m and m.group("class") == cls.TMDB_CLASS else None + + def __init__(self) -> None: + super().__init__("tmdb") + + def uri_suitable(self, uri: str) -> SuitableLevel: + id = self._get_id(uri) + return SuitableLevel.always_or_no(bool(id)) + + def can_extract_offline(self, uri: str) -> bool: + return True + + def _cache_expired(self, object: MediaCollection) -> bool: + last_release_date = orm.max(l.element.release_date for l in object.media_links) + return (datetime.now() - object.last_updated) > ( + self._calculate_wait_hours(last_release_date) * 7 * 24 + ) + + def _extract_offline(self, uri: str) -> ExtractedDataLight: + id = self._get_id(uri) + return ExtractedDataLight( + extractor_name=self.name, + object_key=f"{self.TMDB_CLASS}:{id}", + object_uri=uri, + ) + + +class TmdbCollectionExtractor(TmdbBaseExtractor[TmdbCollectionData]): + + TMDB_CLASS = "collection" + + def _extract_online(self, uri: str) -> ExtractedData[TmdbCollectionData]: + id = self._get_id(uri) + data = TmdbCollectionData.from_id(id) + return ExtractedData( + extractor_name=self.name, + object_key=f"{self.TMDB_CLASS}:{id}", + object_uri=uri, + data=data, + ) + + def _update_object_raw( + self, + object: MediaCollection, + data: TmdbCollectionData, + ) -> None: + # extract data + object.title = f"[tmdb] [{self.TMDB_CLASS}] {data.title}" + object.description = data.description or "" + object.release_date = data.release_date + object.set_watch_in_order_auto(True) + object.add_uris((data.tmdb_custom_uri,)) + parts = sorted( + (part for part in data.parts if part.was_released), + key=lambda p: p.release_date, + ) + for index, movie in enumerate(parts): + element = self._add_episode( + collection=object, + uri=movie.tmdb_custom_uri, + episode=index + 1, + ) + if element: + orm.commit() + + +class TmdbKeywordExtractor(TmdbBaseExtractor[TmdbKeywordData]): + + TMDB_CLASS = "keyword" + + def _extract_online(self, uri: str) -> ExtractedData[TmdbKeywordData]: + id = self._get_id(uri) + data = TmdbKeywordData.from_id(id) + return ExtractedData( + extractor_name=self.name, + object_key=f"{self.TMDB_CLASS}:{id}", + object_uri=uri, + data=data, + ) + + def _update_object_raw( + self, + object: MediaCollection, + data: TmdbKeywordData, + ) -> None: + # extract data + object.title = f"[tmdb] [{self.TMDB_CLASS}] {data.title}" + object.release_date = data.release_date + object.set_watch_in_order_auto(True) + object.add_uris((data.tmdb_custom_uri,)) + parts = sorted( + (part for part in data.parts if part.was_released), + key=lambda p: p.release_date, + ) + for index, movie in enumerate(parts): + element = self._add_episode( + collection=object, + uri=movie.tmdb_custom_uri, + episode=index + 1, + ) + if element: + orm.commit() diff --git a/server/entertainment_decider/extractors/media/__init__.py b/server/entertainment_decider/extractors/media/__init__.py index 83e45df..ca53cec 100644 --- a/server/entertainment_decider/extractors/media/__init__.py +++ b/server/entertainment_decider/extractors/media/__init__.py @@ -6,12 +6,14 @@ from typing import Dict, Tuple from ...models import MediaElement from ..helpers import expect_suitable_extractor from .base import MediaExtractor +from .tmdb import TmdbMovieMediaExtractor from .tvmaze import TvmazeMediaExtractor from .youtube import YoutubeMediaExtractor from .ytdl import YtdlMediaExtractor MEDIA_EXTRACTORS: Dict[str, MediaExtractor] = { + "tmdb": TmdbMovieMediaExtractor(), "tvmaze": TvmazeMediaExtractor(), "youtube": YoutubeMediaExtractor(), "ytdl": YtdlMediaExtractor(), diff --git a/server/entertainment_decider/extractors/media/tmdb.py b/server/entertainment_decider/extractors/media/tmdb.py new file mode 100644 index 0000000..fbb711e --- /dev/null +++ b/server/entertainment_decider/extractors/media/tmdb.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import itertools +import re +from typing import List, Optional + +from pony import orm + +from ...models import MediaElement, MediaThumbnail, Query, Tag +from ..all.tmdb import TmdbMovieData, TMDB_REGEX_URI +from ..generic import ExtractedData, ExtractedDataLight, ExtractionError, SuitableLevel +from .base import MediaExtractor + + +class TmdbMovieMediaExtractor(MediaExtractor[TmdbMovieData]): + + SUPPORTED_PATTERN = re.compile( + rf"""^ + {TMDB_REGEX_URI} + /movie/ + (?P\d+) + (?!\d) + (-[^/]+)? + /? + $""", + re.VERBOSE, + ) + + @classmethod + def __get_movie_id(cls, uri: str) -> Optional[int]: + m = cls.SUPPORTED_PATTERN.search(uri) + return int(m.group("movie_id")) if m else None + + def __init__(self) -> None: + super().__init__("tmdb") + + def uri_suitable(self, uri: str) -> SuitableLevel: + movie_id = self.__get_movie_id(uri) + return SuitableLevel.always_or_no(bool(movie_id)) + + def can_extract_offline(self, uri: str) -> bool: + return True + + def _extract_offline(self, uri: str) -> ExtractedDataLight: + movie_id = self.__get_movie_id(uri) + return ExtractedDataLight( + extractor_name=self.name, + object_key=str(movie_id), + object_uri=uri, + ) + + def _extract_online(self, uri: str) -> ExtractedData[TmdbMovieData]: + movie_id = self.__get_movie_id(uri) + data = TmdbMovieData.from_id(movie_id) + return ExtractedData( + extractor_name=self.name, + object_key=f"movie:{movie_id}", + object_uri=uri, + data=data, + ) + + def _update_object_raw(self, object: MediaElement, data: TmdbMovieData) -> None: + # sanity check + if not data.was_released: + raise ExtractionError( + f"Could not extract {object.uri!r} because of missing data probably due to not being released yet" + ) + # extract data + object.title = data.title + object.description = data.description or "" + object.thumbnail = ( + MediaThumbnail.from_uri(data.thumbnail_uri) if data.thumbnail_uri else None + ) + object.release_date = data.release_date + object.length = data.length + object.add_uris( + ( + data.imdb_custom_uri, + data.tmdb_custom_uri, + data.tmdb_short_uri, + ) + ) + for genre in itertools.chain(["Movie"], data.genres): + tag_list: List[Tag] = list( + orm.select(tag for tag in Tag if tag.title == genre) + ) + if len(tag_list) == 1: + object.tag_list.add(tag_list[0])