Format code

- server/entertainment_decider/common.py
- server/entertainment_decider/extractors/all/tt_rss.py
- server/entertainment_decider/extractors/all/ytdl.py
- server/entertainment_decider/extractors/collection/__init__.py
- server/entertainment_decider/extractors/collection/tt_rss.py
- server/entertainment_decider/extractors/collection/youtube.py
- server/entertainment_decider/extractors/generic.py
- server/entertainment_decider/extractors/helpers.py
- server/entertainment_decider/extractors/media/__init__.py
- server/entertainment_decider/extractors/media/base.py
- server/entertainment_decider/extractors/media/youtube.py
- server/entertainment_decider/extractors/media/ytdl.py

Applied automatically using black
master
Felix Stupp 2 years ago
parent b7cbcbd540
commit f236db4084
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -2,18 +2,32 @@ import itertools
import subprocess
from typing import Iterable, List, Literal, TypeVar, Union
def call(args, check=True, stdin=None) -> subprocess.CompletedProcess:
proc = subprocess.run(args, capture_output=True, check=check, text=True, stdin=stdin)
proc = subprocess.run(
args,
capture_output=True,
check=check,
text=True,
stdin=stdin,
)
return proc
def update_bool_value(old_value: bool, new_value: Union[bool, Literal["toggle"]]) -> bool:
def update_bool_value(
old_value: bool, new_value: Union[bool, Literal["toggle"]]
) -> bool:
if new_value == "toggle":
return not old_value
if type(new_value) != bool:
raise Exception(f"Invalid type of new_value: Expected bool or literal \"toggle\", got type={type(new_value)!r}, value={new_value!r}")
raise Exception(
f'Invalid type of new_value: Expected bool or literal "toggle", got type={type(new_value)!r}, value={new_value!r}'
)
return new_value
T = TypeVar("T")
def limit_iter(iter: Iterable[T], limit: int) -> List[T]:
return list(itertools.islice(iter, limit))

@ -28,14 +28,18 @@ TT_RSS_CONNECTION: Connection = None
HeadlineList = List[Headline]
def _build_connection(params: TtRssConnectionParameter) -> Connection:
global TT_RSS_CONNECTION
if TT_RSS_CONNECTION is None:
TT_RSS_CONNECTION = Connection(proto=params.proto, host=params.host, endpoint=params.endpoint)
TT_RSS_CONNECTION = Connection(
proto=params.proto, host=params.host, endpoint=params.endpoint
)
if not TT_RSS_CONNECTION.isLoggedIn():
TT_RSS_CONNECTION.login(username=params.username, password=params.password)
return TT_RSS_CONNECTION
def get_headlines(params: TtRssConnectionParameter, **kwargs) -> HeadlineList:
conn = _build_connection(params)
if "limit" in kwargs:
@ -71,9 +75,11 @@ class TtRssUriKind(Enum):
@dataclass
class TtRssUri:
supported_kinds = '|'.join(re.escape(n.path_name.lower()) for n in TtRssUriKind)
supported_kinds = "|".join(re.escape(n.path_name.lower()) for n in TtRssUriKind)
scheme = "tt-rss"
path_re = re.compile(fr"^/((?P<all>all)|(?P<kind>{supported_kinds})/(?P<id>-?\d+))/?$")
path_re = re.compile(
rf"^/((?P<all>all)|(?P<kind>{supported_kinds})/(?P<id>-?\d+))/?$"
)
kind: TtRssUriKind
id: Optional[str]
@ -95,9 +101,16 @@ class TtRssUri:
if m is None:
raise Exception(f"Could not parse path of tt-rss uri: {parts.path!r}")
return TtRssUri(
kind = TtRssUriKind.ALL if m.group("all") else TtRssUriKind.from_path_name(m.group("kind")),
id = m.group("id"),
options = {single[0]: single[1] for single in (single.split("=") for single in parts.query.split("&"))} if parts.query else {},
kind=TtRssUriKind.ALL
if m.group("all")
else TtRssUriKind.from_path_name(m.group("kind")),
id=m.group("id"),
options={
single[0]: single[1]
for single in (single.split("=") for single in parts.query.split("&"))
}
if parts.query
else {},
)
def request(self, params: TtRssConnectionParameter, **kwargs) -> HeadlineList:

@ -9,7 +9,11 @@ from jsoncache import ApplicationCache
from ...common import call
cache = ApplicationCache(app_name="entertainment-decider-ytdl", create_cache_dir=True, default_max_age=7*86400)
cache = ApplicationCache(
app_name="entertainment-decider-ytdl",
create_cache_dir=True,
default_max_age=7 * 86400,
)
cache.clean_cache()
YTDL_CALL = [
@ -20,6 +24,7 @@ YTDL_CALL = [
class YtdlErrorException(subprocess.CalledProcessError):
pass
def ytdl_call(args: List[str]) -> dict:
proc = call(YTDL_CALL + args, check=False)
if proc.returncode != 0:
@ -31,18 +36,24 @@ def ytdl_call(args: List[str]) -> dict:
)
return json.loads(proc.stdout.strip())
@cache.cache_json()
def get_video_info(uri: str) -> dict:
return ytdl_call([
"--no-playlist",
"--dump-json",
uri,
])
return ytdl_call(
[
"--no-playlist",
"--dump-json",
uri,
]
)
@cache.cache_json()
def get_playlist_info(uri: str) -> dict:
return ytdl_call([
"--yes-playlist",
"--dump-single-json",
uri,
])
return ytdl_call(
[
"--yes-playlist",
"--dump-single-json",
uri,
]
)

@ -12,23 +12,29 @@ from .youtube import YouTubeCollectionExtractor
tt_rss_params = TtRssConnectionParameter(**app_config["extractors"]["tt_rss"])
COLLECTION_EXTRACTORS: Dict[str, CollectionExtractor] = {
"tt-rss": TtRssCollectionExtractor(params=tt_rss_params, label_filter=-1033),
"tt-rss": TtRssCollectionExtractor(
params=tt_rss_params,
label_filter=-1033,
),
"youtube": YouTubeCollectionExtractor(),
}
def collection_expect_extractor(uri: str) -> CollectionExtractor:
return expect_suitable_extractor(
extractor_list = COLLECTION_EXTRACTORS.values(),
uri = uri,
extractor_list=COLLECTION_EXTRACTORS.values(),
uri=uri,
)
def collection_update(collection: MediaCollection, check_cache_expired: bool = True):
ex = collection_expect_extractor(collection.uri)
ex.update_object(
object = collection,
check_cache_expired = check_cache_expired,
object=collection,
check_cache_expired=check_cache_expired,
)
def collection_extract_uri(uri: str) -> MediaCollection:
elem: MediaCollection = CollectionExtractor.check_uri(uri)
ex = collection_expect_extractor(uri)

@ -4,7 +4,7 @@ from datetime import datetime, timedelta
import logging
from typing import Optional
from pony import orm # TODO remove
from pony import orm # TODO remove
from ...models import MediaCollection
from ..all.tt_rss import HeadlineList, TtRssConnectionParameter, TtRssUri
@ -18,7 +18,8 @@ class TtRssCollectionExtractor(CollectionExtractor[HeadlineList]):
__label_filter: Optional[int]
__mark_as_read: bool
def __init__(self,
def __init__(
self,
params: TtRssConnectionParameter,
mark_as_read: bool = False,
label_filter: Optional[int] = None,
@ -53,16 +54,20 @@ class TtRssCollectionExtractor(CollectionExtractor[HeadlineList]):
data = rss_uri.request(self.__params, order_by="feed_dates", view_mode="unread")
if self.__label_filter is not None:
data = [
headline for headline in data
if self.__label_filter in (label_marker[0] for label_marker in headline.labels)
headline
for headline in data
if self.__label_filter
in (label_marker[0] for label_marker in headline.labels)
]
if self.__mark_as_read:
parameters = {
"article_ids": ",".join(str(headline.feedId) for headline in data),
"field": "2", # unread
"mode": "0", # false
"field": "2", # unread
"mode": "0", # false
}
raise NotImplementedError("Cannot set articles as read with tinytinypy for now") # TODO
raise NotImplementedError(
"Cannot set articles as read with tinytinypy for now"
) # TODO
return ExtractedData(
extractor_name=self.name,
object_key=uri,
@ -75,10 +80,7 @@ class TtRssCollectionExtractor(CollectionExtractor[HeadlineList]):
object.title = object.uri
logging.debug(f"Got {len(data)} headlines")
for headline in data:
self._add_episode(
collection = object,
uri = headline.url
)
self._add_episode(collection=object, uri=headline.url)
orm.commit()
if object.watch_in_order_auto:
object.watch_in_order = False # no order available
object.watch_in_order = False # no order available

@ -5,7 +5,7 @@ import logging
import re
from typing import Dict
from pony import orm # TODO remove
from pony import orm # TODO remove
import youtubesearchpython
from ...models import MediaCollection
@ -15,7 +15,9 @@ from .base import CollectionExtractor
class YouTubeCollectionExtractor(CollectionExtractor[Dict]):
__uri_regex = re.compile(r"^https?://(www\.)?youtube\.com/(channel/|playlist\?list=)(?P<id>[^/&?]+)")
__uri_regex = re.compile(
r"^https?://(www\.)?youtube\.com/(channel/|playlist\?list=)(?P<id>[^/&?]+)"
)
@classmethod
def __get_id(cls, uri: str) -> str:
@ -69,7 +71,9 @@ class YouTubeCollectionExtractor(CollectionExtractor[Dict]):
playlist = youtubesearchpython.Playlist(playlist_link)
while playlist.hasMoreVideos:
playlist.getNextVideos()
logging.debug(f"Retrieved {len(playlist.videos)} videos from playlist {playlist_link!r}")
logging.debug(
f"Retrieved {len(playlist.videos)} videos from playlist {playlist_link!r}"
)
return ExtractedData(
extractor_name=self.name,
object_key=playlist_id,
@ -83,7 +87,11 @@ class YouTubeCollectionExtractor(CollectionExtractor[Dict]):
def _update_object_raw(self, object: MediaCollection, data: Dict):
info = data["info"]
is_channel = self.__is_channel_id(info["id"])
object.title = f"[channel] [{self.name}] {info['channel']['name']}" if is_channel else f"[playlist] {info['channel']['name']}: {info['title']}"
object.title = (
f"[channel] [{self.name}] {info['channel']['name']}"
if is_channel
else f"[playlist] {info['channel']['name']}: {info['title']}"
)
object.add_single_uri(info["link"])
video_list = data["videos"]
if object.watch_in_order_auto:
@ -97,10 +105,10 @@ class YouTubeCollectionExtractor(CollectionExtractor[Dict]):
f"https://youtu.be/{video['id']}",
]
element = self._add_episode(
collection = object,
uri = video_url,
episode = index + 1,
collection=object,
uri=video_url,
episode=index + 1,
)
if element:
element.add_uris(other_urls)
orm.commit() # so progress is stored
orm.commit() # so progress is stored

@ -52,16 +52,16 @@ class ExtractedDataLight:
def create_media(self) -> MediaElement:
return MediaElement(
uri = self.object_uri,
extractor_name = self.extractor_name,
extractor_key = self.object_key,
uri=self.object_uri,
extractor_name=self.extractor_name,
extractor_key=self.object_key,
)
def create_collection(self) -> MediaCollection:
return MediaCollection(
uri = self.object_uri,
extractor_name = self.extractor_name,
extractor_key = self.object_key
uri=self.object_uri,
extractor_name=self.extractor_name,
extractor_key=self.object_key,
)
@ -74,10 +74,14 @@ class ExtractedData(ExtractedDataLight, Generic[T]):
return self.data is not None
def load_media(self) -> Optional[MediaElement]:
return MediaElement.get(extractor_name=self.extractor_name, extractor_key=self.object_key)
return MediaElement.get(
extractor_name=self.extractor_name, extractor_key=self.object_key
)
def load_collection(self) -> Optional[MediaCollection]:
return MediaCollection.get(extractor_name=self.extractor_name, extractor_key=self.object_key)
return MediaCollection.get(
extractor_name=self.extractor_name, extractor_key=self.object_key
)
@dataclass
@ -91,6 +95,7 @@ class AuthorExtractedData(ExtractedDataLight):
E = TypeVar("E", MediaElement, MediaCollection)
class GeneralExtractor(Generic[E, T]):
name: str
@ -136,7 +141,11 @@ class GeneralExtractor(Generic[E, T]):
# defined
def _extract_offline(self, uri: str) -> ExtractedData[T]:
return self._extract_offline_only(uri) if self.can_extract_offline(uri) else self._extract_online(uri)
return (
self._extract_offline_only(uri)
if self.can_extract_offline(uri)
else self._extract_online(uri)
)
def _extract_required(self, data: ExtractedData[T]) -> ExtractedData[T]:
if data.has_data:
@ -151,8 +160,14 @@ class GeneralExtractor(Generic[E, T]):
return object
def update_object(self, object: E, check_cache_expired: bool = True) -> E:
if object.was_extracted and check_cache_expired and not self._cache_expired(object.last_updated):
logging.debug(f"Skip info for element as already extracted and cache valid: {object.title!r}")
if (
object.was_extracted
and check_cache_expired
and not self._cache_expired(object.last_updated)
):
logging.debug(
f"Skip info for element as already extracted and cache valid: {object.title!r}"
)
return object
data = self._extract_online(object.uri)
logging.debug(f"Updating info for media: {data!r}")

@ -18,6 +18,7 @@ def search_suitable_extractor(extractor_list: Iterable[T], uri: str) -> Optional
best_bet = extractor
return best_bet
def expect_suitable_extractor(extractor_list: Iterable[T], uri: str) -> T:
extractor = search_suitable_extractor(extractor_list, uri)
if extractor is None:

@ -15,19 +15,22 @@ MEDIA_EXTRACTORS: Dict[str, MediaExtractor] = {
"ytdl": YtdlMediaExtractor(),
}
def media_expect_extractor(uri: str) -> MediaExtractor:
return expect_suitable_extractor(
extractor_list = MEDIA_EXTRACTORS.values(),
uri = uri,
extractor_list=MEDIA_EXTRACTORS.values(),
uri=uri,
)
def media_update(element: MediaElement, check_cache_expired: bool = True):
ex = media_expect_extractor(element.uri)
ex.update_object(
object = element,
check_cache_expired = check_cache_expired,
object=element,
check_cache_expired=check_cache_expired,
)
def media_extract_uri(uri: str) -> MediaElement:
elem: MediaElement = MediaExtractor.check_uri(uri)
if not elem:

@ -28,8 +28,8 @@ class MediaExtractor(GeneralExtractor[MediaElement, T]):
elem: MediaElement = MediaElement.get(uri=uri)
if elem:
logging.warning(
f"Add missing URI mapping entry for uri {uri!r}, " +
"this should not happen at this point and is considered a bug"
f"Add missing URI mapping entry for uri {uri!r}, "
+ "this should not happen at this point and is considered a bug"
)
elem.add_single_uri(uri)
return elem
@ -41,13 +41,17 @@ class MediaExtractor(GeneralExtractor[MediaElement, T]):
def _load_object(self, data: ExtractedData[T]) -> MediaElement:
return data.load_media()
def __create_author_collection(self, author_data: AuthorExtractedData) -> MediaCollection:
def __create_author_collection(
self, author_data: AuthorExtractedData
) -> MediaCollection:
collection = author_data.create_collection()
collection.keep_updated = False
collection.watch_in_order = False
return collection
def __lookup_author_collection(self, author_data: AuthorExtractedData) -> Optional[MediaCollection]:
def __lookup_author_collection(
self, author_data: AuthorExtractedData
) -> Optional[MediaCollection]:
return CollectionExtractor.check_uri(
uri=author_data.object_uri,
) or MediaCollection.get(
@ -55,12 +59,16 @@ class MediaExtractor(GeneralExtractor[MediaElement, T]):
extractor_key=author_data.object_key,
)
def __get_author_collection(self, author_data: AuthorExtractedData) -> MediaCollection:
def __get_author_collection(
self, author_data: AuthorExtractedData
) -> MediaCollection:
collection = self.__lookup_author_collection(author_data)
if collection is None:
collection = self.__create_author_collection(author_data)
if not collection.title or collection.title.startswith("[author] "):
collection.title = f"[author] [{author_data.extractor_name}] {author_data.author_name}"
collection.title = (
f"[author] [{author_data.extractor_name}] {author_data.author_name}"
)
return collection
def __add_to_author_collection(self, element: MediaElement, data: Dict):

@ -7,27 +7,37 @@ from typing import List, Optional, TypedDict
from youtubesearchpython import ResultMode, Video
from ...models import MediaElement
from ..generic import AuthorExtractedData, ExtractedData, SuitableLevel
from ...models import (
MediaElement,
)
from ..generic import (
AuthorExtractedData,
ExtractedData,
SuitableLevel,
)
from .base import MediaExtractor
class YoutubeDuration(TypedDict):
secondsText: str
class YoutubeViewCount(TypedDict):
text: str
class YoutubeThumbnailData(TypedDict):
url: str
width: int
height: int
class YoutubeChannelData(TypedDict):
name: str
id: str
link: str
class YoutubeVideoData(TypedDict):
id: str
title: str
@ -48,14 +58,17 @@ class YoutubeVideoData(TypedDict):
class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
__uri_regex = re.compile(r"""^
__uri_regex = re.compile(
r"""^
https?://(
(www\.)?youtube\.com/(
watch\?v=
)|
youtu\.be/
)(?P<id>[^/&?]+)
$""", re.VERBOSE)
$""",
re.VERBOSE,
)
def __init__(self):
super().__init__("ytdl")
@ -65,17 +78,17 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
def _get_author_data(self, data: YoutubeVideoData) -> Optional[AuthorExtractedData]:
return AuthorExtractedData(
object_uri = data["channel"]["link"],
extractor_name = self.name,
object_key = f"author:{data['channel']['id']}",
author_name = data["channel"]["name"]
object_uri=data["channel"]["link"],
extractor_name=self.name,
object_key=f"author:{data['channel']['id']}",
author_name=data["channel"]["name"],
)
def _extract_online(self, uri: str) -> ExtractedData[YoutubeVideoData]:
logging.info(f"Request info using youtube_search_python for {uri!r}")
vid_data: YoutubeVideoData = Video.getInfo(
videoLink = uri,
mode = ResultMode.dict,
videoLink=uri,
mode=ResultMode.dict,
)
return ExtractedData[YoutubeVideoData](
object_uri=uri,
@ -86,5 +99,7 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
def _update_object_raw(self, object: MediaElement, data: YoutubeVideoData):
object.title = f"{data['title']} - {data['channel']['name']}"
object.release_date = datetime.strptime(data.get("uploadDate") or data["publishDate"], "%Y-%m-%d")
object.release_date = datetime.strptime(
data.get("uploadDate") or data["publishDate"], "%Y-%m-%d"
)
object.length = int(data["duration"]["secondsText"])

@ -5,14 +5,15 @@ import logging
from typing import Dict, Optional
from ...models import MediaElement
from ...models import (
MediaElement,
)
from ..all.ytdl import get_video_info, YtdlErrorException
from ..generic import AuthorExtractedData, ExtractedData, ExtractionError, SuitableLevel
from .base import MediaExtractor
from .base import MediaExtractor
class YtdlMediaExtractor(MediaExtractor[Dict]):
def __init__(self):
super().__init__("ytdl")
@ -22,12 +23,18 @@ class YtdlMediaExtractor(MediaExtractor[Dict]):
def _get_author_data(self, data: Dict) -> Optional[AuthorExtractedData]:
video_extractor_key = data.get("extractor_key") or data["ie_key"]
author_key = data.get("channel_id") or data.get("uploader_id")
author_name = data.get("channel") or data.get("uploader") or data.get("uploader_id")
author_name = (
data.get("channel") or data.get("uploader") or data.get("uploader_id")
)
return AuthorExtractedData(
object_uri = data.get("channel_url") or data.get("uploader_url"),
extractor_name = self.name,
object_key = f"author:{video_extractor_key}:{author_key}" if author_key else None,
author_name = f"[{video_extractor_key.lower()}] {author_name}" if author_name else None,
object_uri=data.get("channel_url") or data.get("uploader_url"),
extractor_name=self.name,
object_key=f"author:{video_extractor_key}:{author_key}"
if author_key
else None,
author_name=f"[{video_extractor_key.lower()}] {author_name}"
if author_name
else None,
)
def _extract_online(self, uri: str) -> ExtractedData[Dict]:
@ -48,6 +55,10 @@ class YtdlMediaExtractor(MediaExtractor[Dict]):
)
def _update_object_raw(self, object: MediaElement, data: Dict) -> str:
object.title = f"{data['title']} - {data['uploader']}" if "uploader" in data else data["title"]
object.title = (
f"{data['title']} - {data['uploader']}"
if "uploader" in data
else data["title"]
)
object.release_date = datetime.strptime(data["upload_date"], "%Y%m%d")
object.length = int(data["duration"])

Loading…
Cancel
Save