You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
173 lines
5.6 KiB
Python
173 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
import logging
|
|
import re
|
|
from typing import Dict, TypeAlias
|
|
|
|
from pony import orm # TODO remove
|
|
import youtubesearchpython
|
|
|
|
from ...models import MediaCollection
|
|
from ..all.youtube import (
|
|
EXTRACTOR_KEY,
|
|
EXTRACTOR_NAME,
|
|
)
|
|
from ..generic import (
|
|
ChangedReport,
|
|
ExtractedDataOnline,
|
|
ExtractedDataOffline,
|
|
SuitableLevel,
|
|
)
|
|
from .base import CollectionExtractor
|
|
|
|
|
|
DataType: TypeAlias = Dict
|
|
|
|
|
|
class YouTubeCollectionExtractor(CollectionExtractor[DataType]):
|
|
__uri_regex = re.compile(
|
|
r"""^
|
|
https?://
|
|
((
|
|
www
|
|
|
|
|
m
|
|
)\.)?youtube\.com/(
|
|
channel/
|
|
|
|
|
playlist\?list=
|
|
)
|
|
(?P<id>[^/&?]+)
|
|
""",
|
|
re.VERBOSE,
|
|
)
|
|
|
|
@classmethod
|
|
def __get_id(cls, uri: str) -> str:
|
|
m = cls.__uri_regex.search(uri)
|
|
if not m:
|
|
raise Exception(f"Failed to parse Youtube collection uri {uri!r}")
|
|
return m.group("id")
|
|
|
|
@staticmethod
|
|
def __is_channel_id(collection_id: str) -> bool:
|
|
return collection_id.startswith("UC") or collection_id.startswith("UU")
|
|
|
|
@staticmethod
|
|
def __convert_channel_id(channel_id: str) -> str:
|
|
if channel_id.startswith("UU"):
|
|
return channel_id
|
|
if channel_id.startswith("UC"):
|
|
return f"UU{channel_id[2:]}"
|
|
raise Exception(f"Got not valid channel id: {channel_id!r}")
|
|
|
|
@classmethod
|
|
def __convert_if_required(cls, collection_id: str) -> str:
|
|
if cls.__is_channel_id(collection_id):
|
|
return cls.__convert_channel_id(collection_id)
|
|
return collection_id
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
key=EXTRACTOR_KEY,
|
|
long_name=EXTRACTOR_NAME,
|
|
name="youtube",
|
|
)
|
|
|
|
def uri_suitable(self, uri: str) -> SuitableLevel:
|
|
return SuitableLevel.always_or_no(self.__uri_regex.match(uri) is not None)
|
|
|
|
def can_extract_offline(self, uri: str) -> bool:
|
|
return True
|
|
|
|
def _cache_expired(self, object: MediaCollection) -> bool:
|
|
last_release_date = orm.max(l.element.release_date for l in object.media_links)
|
|
return (datetime.now() - object.last_updated) > self._calculate_wait_hours(
|
|
last_release_date
|
|
)
|
|
|
|
def _extract_offline(self, uri: str) -> ExtractedDataOffline[DataType]:
|
|
playlist_id = self.__convert_if_required(self.__get_id(uri))
|
|
return ExtractedDataOffline[DataType](
|
|
extractor_name=self.name,
|
|
object_key=playlist_id,
|
|
object_uri=uri,
|
|
)
|
|
|
|
def _extract_online(self, uri: str) -> ExtractedDataOnline[DataType]:
|
|
orig_id = self.__get_id(uri)
|
|
playlist_id = self.__convert_if_required(orig_id)
|
|
playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}"
|
|
is_channel = self.__is_channel_id(playlist_id)
|
|
logging.info(f"Request Youtube playlist {playlist_link!r}")
|
|
playlist = youtubesearchpython.Playlist(playlist_link)
|
|
try:
|
|
while playlist.hasMoreVideos:
|
|
playlist.getNextVideos()
|
|
except Exception as e:
|
|
# TODO improve check of Exception kind if possible
|
|
if is_channel and "invalid status code" in str(e.args[0]).lower():
|
|
# Partial Update on channels can be accepted because newest videos are at the top
|
|
logging.warning(
|
|
f"Failed to retrieve channel completely, proceed with partial update"
|
|
)
|
|
else:
|
|
raise e
|
|
logging.debug(
|
|
f"Retrieved {len(playlist.videos)} videos from playlist {playlist_link!r}"
|
|
)
|
|
return ExtractedDataOnline[DataType](
|
|
extractor_name=self.name,
|
|
object_key=playlist_id,
|
|
object_uri=uri,
|
|
data={
|
|
"info": playlist.info["info"],
|
|
"videos": playlist.videos,
|
|
},
|
|
)
|
|
|
|
def _update_object_raw(
|
|
self,
|
|
object: MediaCollection,
|
|
data: DataType,
|
|
) -> ChangedReport:
|
|
info = data["info"]
|
|
is_channel = self.__is_channel_id(info["id"])
|
|
object.title = (
|
|
f"[channel] [{self.name}] {info['channel']['name']}"
|
|
if is_channel
|
|
else f"[playlist] {info['channel']['name']}: {info['title']}"
|
|
)
|
|
object.description = data.get("description")
|
|
object.add_single_uri(info["link"])
|
|
video_list = data["videos"]
|
|
object.set_watch_in_order_auto(not is_channel)
|
|
if is_channel:
|
|
video_list = reversed(video_list)
|
|
object.sorting_method = 1 # TODO sort channels by date
|
|
for index, video in enumerate(video_list):
|
|
video_url = f"https://www.youtube.com/watch?v={video['id']}"
|
|
element = self._add_episode(
|
|
collection=object,
|
|
uri=video_url,
|
|
episode=index + 1,
|
|
)
|
|
if element:
|
|
orm.commit() # so progress is stored
|
|
object.release_date = (
|
|
object.first_released_episode.element.release_date
|
|
if len(object.media_links) > 0
|
|
else None
|
|
)
|
|
# creator exists in most cases as videos were already processed
|
|
# if not, creator is not that important
|
|
object.creator = (
|
|
object
|
|
if is_channel
|
|
else CollectionExtractor.check_uri(
|
|
f"https://www.youtube.com/channel/{info['channel']['id']}"
|
|
)
|
|
)
|
|
return ChangedReport.ChangedSome # TODO improve
|