from __future__ import annotations from datetime import datetime import logging import re from typing import Dict, Optional from ...models import ( MediaElement, MediaThumbnail, thumbnail_sort_key, ) from ..all.ytdl import get_video_info, YtdlErrorException from ..generic import ( AuthorExtractedData, ChangedReport, ExtractedDataOnline, ExtractionError, SuitableLevel, ) from .base import MediaExtractor EXTRACTOR_KEY = ".extractor/.yt-dl" EXTRACTOR_NAME = "YT-DL" class YtdlMediaExtractor(MediaExtractor[Dict]): SUPPORTED_PATTERN = re.compile( r"""^ https?:// """, re.VERBOSE, ) def __init__(self) -> None: super().__init__( key=EXTRACTOR_KEY, long_name=EXTRACTOR_NAME, name="ytdl", ) def uri_suitable(self, uri: str) -> SuitableLevel: return SuitableLevel.fallback_or_no(bool(self.SUPPORTED_PATTERN.search(uri))) def _get_author_data(self, data: Dict) -> Optional[AuthorExtractedData]: video_extractor_key = data.get("extractor_key") or data["ie_key"] author_key = data.get("channel_id") or data.get("uploader_id") author_name = ( data.get("channel") or data.get("uploader") or data.get("uploader_id") ) return AuthorExtractedData( object_uri=data.get("channel_url") or data.get("uploader_url"), extractor_name=self.name, object_key=f"author:{video_extractor_key}:{author_key}" if author_key else None, author_name=f"[{video_extractor_key.lower()}] {author_name}" if author_name else None, ) def _extract_online(self, uri: str) -> ExtractedDataOnline[Dict]: logging.info(f"Request info using youtube-dl for {uri!r}") try: vid_data = get_video_info(uri) except YtdlErrorException as e: raise ExtractionError from e if vid_data.get("is_live", False): raise ExtractionError("Video is live, so pass extraction") ytdl_extractor_key = vid_data.get("extractor_key") or vid_data["ie_key"] ytdl_video_id = vid_data["id"] return ExtractedDataOnline[Dict]( object_uri=uri, extractor_name=self.name, object_key=f"{ytdl_extractor_key}:{ytdl_video_id}", data=vid_data, ) def _update_object_raw(self, object: MediaElement, data: Dict) -> ChangedReport: object.title = ( f"{data['title']} - {data['uploader']}" if "uploader" in data else data["title"] ) object.description = data.get("description") thumb_list = ( [ thumb for thumb in data["thumbnails"] if "width" in thumb and "height" in thumb ] if "thumbnails" in data else None ) if thumb_list: best_thumb = min( thumb_list, key=lambda thumb: thumbnail_sort_key(thumb["width"], thumb["height"]), ) object.thumbnail = MediaThumbnail.from_uri(best_thumb["url"]) elif data.get("thumbnail"): object.thumbnail = MediaThumbnail.from_uri(data["thumbnail"]) object.release_date = datetime.strptime(data["upload_date"], "%Y%m%d") object.length = int(data["duration"]) return ChangedReport.ChangedSome # TODO improve