You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
4.1 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta
import logging
import re
from typing import Dict
from pony import orm # TODO remove
import youtubesearchpython
from ...models import MediaCollection
from ..generic import ExtractedData, ExtractionError, SuitableLevel
from ..media import media_extract_uri
from .base import CollectionExtractor
class YouTubeCollectionExtractor(CollectionExtractor[Dict]):
__uri_regex = re.compile(r"^https?://(www\.)?youtube\.com/(channel/|playlist\?list=)(?P<id>[^/&?]+)")
@classmethod
def __get_id(cls, uri: str) -> str:
m = cls.__uri_regex.search(uri)
if not m:
raise Exception(f"Failed to parse Youtube collection uri {uri!r}")
return m.group("id")
@staticmethod
def __is_channel_id(collection_id: str) -> bool:
return collection_id.startswith("UC") or collection_id.startswith("UU")
@staticmethod
def __convert_channel_id(channel_id: str) -> str:
if channel_id.startswith("UU"):
return channel_id
if channel_id.startswith("UC"):
return f"UU{channel_id[2:]}"
raise Exception(f"Got not valid channel id: {channel_id!r}")
@classmethod
def __convert_if_required(cls, collection_id: str) -> str:
if cls.__is_channel_id(collection_id):
return cls.__convert_channel_id(collection_id)
return collection_id
def __init__(self):
super().__init__("youtube")
def uri_suitable(self, uri: str) -> SuitableLevel:
return SuitableLevel.ALWAYS if self.__uri_regex.match(uri) else SuitableLevel.NO
def can_extract_offline(self, uri: str) -> bool:
return True
def _cache_expired(self, date: datetime) -> bool:
return (datetime.now() - date) < timedelta(hours=4)
def _extract_offline(self, uri: str) -> ExtractedData[Dict]:
playlist_id = self.__convert_if_required(self.__get_id(uri))
return ExtractedData(
extractor_name=self.name,
object_key=playlist_id,
object_uri=uri,
)
def _extract_online(self, uri: str) -> ExtractedData[Dict]:
playlist_id = self.__convert_if_required(self.__get_id(uri))
playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}"
logging.info(f"Request Youtube playlist {playlist_link!r}")
playlist = youtubesearchpython.Playlist(playlist_link)
while playlist.hasMoreVideos:
playlist.getNextVideos()
logging.debug(f"Retrieved {len(playlist.videos)} videos from playlist {playlist_link!r}")
return ExtractedData(
extractor_name=self.name,
object_key=playlist_id,
object_uri=uri,
data={
"info": playlist.info["info"],
"videos": playlist.videos,
},
)
def _update_object_raw(self, object: MediaCollection, data: Dict):
info = data["info"]
is_channel = self.__is_channel_id(info["id"])
object.title = f"[channel] [{self.name}] {info['channel']['name']}" if is_channel else f"[playlist] {info['channel']['name']}: {info['title']}"
object.add_uris((info["link"],))
video_list = data["videos"]
if object.watch_in_order_auto:
object.watch_in_order = not is_channel
len_video_list = len(video_list)
if is_channel:
video_list = reversed(video_list)
for index, video in enumerate(video_list):
video_url = f"https://www.youtube.com/watch?v={video['id']}"
other_urls = [
f"https://youtube.com/watch?v={video['id']}",
f"https://youtu.be/{video['id']}",
]
logging.debug(f"[youtube] Add to collection {object.title!r} video {video_url!r} ({index+1} of {len_video_list})")
try:
element = media_extract_uri(video_url)
element.add_uris(other_urls)
object.add_episode(element, episode=index+1)
orm.commit() # so progress is stored
except ExtractionError:
logging.warning(f"Failed while extracting media {video_url!r}", exc_info=True)