import base64 import calendar import copy import datetime import enum import hashlib import itertools import json import math import os.path import random import re import sys import threading import time import traceback import urllib.error import urllib.parse from .common import InfoExtractor, SearchInfoExtractor from .openload import PhantomJSwrapper from ..compat import functools from ..jsinterp import JSInterpreter from ..utils import ( NO_DEFAULT, ExtractorError, UserNotLive, bug_reports_message, classproperty, clean_html, datetime_from_str, dict_get, float_or_none, format_field, get_first, int_or_none, is_html, join_nonempty, js_to_json, mimetype2ext, network_exceptions, orderedSet, parse_codecs, parse_count, parse_duration, parse_iso8601, parse_qs, qualities, remove_start, smuggle_url, str_or_none, str_to_int, strftime_or_none, traverse_obj, try_get, unescapeHTML, unified_strdate, unified_timestamp, unsmuggle_url, update_url_query, url_or_none, urljoin, variadic, ) # any clients starting with _ cannot be explicitly requested by the user INNERTUBE_CLIENTS = { 'web': { 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'WEB', 'clientVersion': '2.20220801.00.00', } }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 1 }, 'web_embedded': { 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'WEB_EMBEDDED_PLAYER', 'clientVersion': '1.20220731.00.00', }, }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 56 }, 'web_music': { 'INNERTUBE_API_KEY': 'AIzaSyC9XL3ZjWddXya6X74dJoCTL-WEYFDNX30', 'INNERTUBE_HOST': 'music.youtube.com', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'WEB_REMIX', 'clientVersion': '1.20220727.01.00', } }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 67, }, 'web_creator': { 'INNERTUBE_API_KEY': 'AIzaSyBUPetSUmoZL-OhlxA7wSac5XinrygCqMo', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'WEB_CREATOR', 'clientVersion': '1.20220726.00.00', } }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 62, }, 'android': { 'INNERTUBE_API_KEY': 'AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'ANDROID', 'clientVersion': '17.31.35', 'androidSdkVersion': 30, 'userAgent': 'com.google.android.youtube/17.31.35 (Linux; U; Android 11) gzip' } }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 3, 'REQUIRE_JS_PLAYER': False }, 'android_embedded': { 'INNERTUBE_API_KEY': 'AIzaSyCjc_pVEDi4qsv5MtC2dMXzpIaDoRFLsxw', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'ANDROID_EMBEDDED_PLAYER', 'clientVersion': '17.31.35', 'androidSdkVersion': 30, 'userAgent': 'com.google.android.youtube/17.31.35 (Linux; U; Android 11) gzip' }, }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 55, 'REQUIRE_JS_PLAYER': False }, 'android_music': { 'INNERTUBE_API_KEY': 'AIzaSyAOghZGza2MQSZkY_zfZ370N-PUdXEo8AI', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'ANDROID_MUSIC', 'clientVersion': '5.16.51', 'androidSdkVersion': 30, 'userAgent': 'com.google.android.apps.youtube.music/5.16.51 (Linux; U; Android 11) gzip' } }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 21, 'REQUIRE_JS_PLAYER': False }, 'android_creator': { 'INNERTUBE_API_KEY': 'AIzaSyD_qjV8zaaUMehtLkrKFgVeSX_Iqbtyws8', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'ANDROID_CREATOR', 'clientVersion': '22.30.100', 'androidSdkVersion': 30, 'userAgent': 'com.google.android.apps.youtube.creator/22.30.100 (Linux; U; Android 11) gzip' }, }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 14, 'REQUIRE_JS_PLAYER': False }, # iOS clients have HLS live streams. Setting device model to get 60fps formats. # See: https://github.com/TeamNewPipe/NewPipeExtractor/issues/680#issuecomment-1002724558 'ios': { 'INNERTUBE_API_KEY': 'AIzaSyB-63vPrdThhKuerbB2N_l7Kwwcxj6yUAc', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'IOS', 'clientVersion': '17.33.2', 'deviceModel': 'iPhone14,3', 'userAgent': 'com.google.ios.youtube/17.33.2 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)' } }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 5, 'REQUIRE_JS_PLAYER': False }, 'ios_embedded': { 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'IOS_MESSAGES_EXTENSION', 'clientVersion': '17.33.2', 'deviceModel': 'iPhone14,3', 'userAgent': 'com.google.ios.youtube/17.33.2 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)' }, }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 66, 'REQUIRE_JS_PLAYER': False }, 'ios_music': { 'INNERTUBE_API_KEY': 'AIzaSyBAETezhkwP0ZWA02RsqT1zu78Fpt0bC_s', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'IOS_MUSIC', 'clientVersion': '5.21', 'deviceModel': 'iPhone14,3', 'userAgent': 'com.google.ios.youtubemusic/5.21 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)' }, }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 26, 'REQUIRE_JS_PLAYER': False }, 'ios_creator': { 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'IOS_CREATOR', 'clientVersion': '22.33.101', 'deviceModel': 'iPhone14,3', 'userAgent': 'com.google.ios.ytcreator/22.33.101 (iPhone14,3; U; CPU iOS 15_6 like Mac OS X)' }, }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 15, 'REQUIRE_JS_PLAYER': False }, # mweb has 'ultralow' formats # See: https://github.com/yt-dlp/yt-dlp/pull/557 'mweb': { 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'MWEB', 'clientVersion': '2.20220801.00.00', } }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 2 }, # This client can access age restricted videos (unless the uploader has disabled the 'allow embedding' option) # See: https://github.com/zerodytrash/YouTube-Internal-Clients 'tv_embedded': { 'INNERTUBE_API_KEY': 'AIzaSyAO_FJ2SlqU8Q4STEHLGCilw_Y9_11qcW8', 'INNERTUBE_CONTEXT': { 'client': { 'clientName': 'TVHTML5_SIMPLY_EMBEDDED_PLAYER', 'clientVersion': '2.0', }, }, 'INNERTUBE_CONTEXT_CLIENT_NAME': 85 }, } def _split_innertube_client(client_name): variant, *base = client_name.rsplit('.', 1) if base: return variant, base[0], variant base, *variant = client_name.split('_', 1) return client_name, base, variant[0] if variant else None def build_innertube_clients(): THIRD_PARTY = { 'embedUrl': 'https://www.youtube.com/', # Can be any valid URL } BASE_CLIENTS = ('android', 'web', 'tv', 'ios', 'mweb') priority = qualities(BASE_CLIENTS[::-1]) for client, ytcfg in tuple(INNERTUBE_CLIENTS.items()): ytcfg.setdefault('INNERTUBE_API_KEY', 'AIzaSyDCU8hByM-4DrUqRUYnGn-3llEO78bcxq8') ytcfg.setdefault('INNERTUBE_HOST', 'www.youtube.com') ytcfg.setdefault('REQUIRE_JS_PLAYER', True) ytcfg['INNERTUBE_CONTEXT']['client'].setdefault('hl', 'en') _, base_client, variant = _split_innertube_client(client) ytcfg['priority'] = 10 * priority(base_client) if not variant: INNERTUBE_CLIENTS[f'{client}_embedscreen'] = embedscreen = copy.deepcopy(ytcfg) embedscreen['INNERTUBE_CONTEXT']['client']['clientScreen'] = 'EMBED' embedscreen['INNERTUBE_CONTEXT']['thirdParty'] = THIRD_PARTY embedscreen['priority'] -= 3 elif variant == 'embedded': ytcfg['INNERTUBE_CONTEXT']['thirdParty'] = THIRD_PARTY ytcfg['priority'] -= 2 else: ytcfg['priority'] -= 3 build_innertube_clients() class BadgeType(enum.Enum): AVAILABILITY_UNLISTED = enum.auto() AVAILABILITY_PRIVATE = enum.auto() AVAILABILITY_PUBLIC = enum.auto() AVAILABILITY_PREMIUM = enum.auto() AVAILABILITY_SUBSCRIPTION = enum.auto() LIVE_NOW = enum.auto() class YoutubeBaseInfoExtractor(InfoExtractor): """Provide base functions for Youtube extractors""" _RESERVED_NAMES = ( r'channel|c|user|playlist|watch|w|v|embed|e|watch_popup|clip|' r'shorts|movies|results|search|shared|hashtag|trending|explore|feed|feeds|' r'browse|oembed|get_video_info|iframe_api|s/player|source|' r'storefront|oops|index|account|t/terms|about|upload|signin|logout') _PLAYLIST_ID_RE = r'(?:(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}|RDMM|WL|LL|LM)' # _NETRC_MACHINE = 'youtube' # If True it will raise an error if no login info is provided _LOGIN_REQUIRED = False _INVIDIOUS_SITES = ( # invidious-redirect websites r'(?:www\.)?redirect\.invidious\.io', r'(?:(?:www|dev)\.)?invidio\.us', # Invidious instances taken from https://github.com/iv-org/documentation/blob/master/docs/instances.md r'(?:www\.)?invidious\.pussthecat\.org', r'(?:www\.)?invidious\.zee\.li', r'(?:www\.)?invidious\.ethibox\.fr', r'(?:www\.)?invidious\.3o7z6yfxhbw7n3za4rss6l434kmv55cgw2vuziwuigpwegswvwzqipyd\.onion', r'(?:www\.)?osbivz6guyeahrwp2lnwyjk2xos342h4ocsxyqrlaopqjuhwn2djiiyd\.onion', r'(?:www\.)?u2cvlit75owumwpy4dj2hsmvkq7nvrclkpht7xgyye2pyoxhpmclkrad\.onion', # youtube-dl invidious instances list r'(?:(?:www|no)\.)?invidiou\.sh', r'(?:(?:www|fi)\.)?invidious\.snopyta\.org', r'(?:www\.)?invidious\.kabi\.tk', r'(?:www\.)?invidious\.mastodon\.host', r'(?:www\.)?invidious\.zapashcanon\.fr', r'(?:www\.)?(?:invidious(?:-us)?|piped)\.kavin\.rocks', r'(?:www\.)?invidious\.tinfoil-hat\.net', r'(?:www\.)?invidious\.himiko\.cloud', r'(?:www\.)?invidious\.reallyancient\.tech', r'(?:www\.)?invidious\.tube', r'(?:www\.)?invidiou\.site', r'(?:www\.)?invidious\.site', r'(?:www\.)?invidious\.xyz', r'(?:www\.)?invidious\.nixnet\.xyz', r'(?:www\.)?invidious\.048596\.xyz', r'(?:www\.)?invidious\.drycat\.fr', r'(?:www\.)?inv\.skyn3t\.in', r'(?:www\.)?tube\.poal\.co', r'(?:www\.)?tube\.connect\.cafe', r'(?:www\.)?vid\.wxzm\.sx', r'(?:www\.)?vid\.mint\.lgbt', r'(?:www\.)?vid\.puffyan\.us', r'(?:www\.)?yewtu\.be', r'(?:www\.)?yt\.elukerio\.org', r'(?:www\.)?yt\.lelux\.fi', r'(?:www\.)?invidious\.ggc-project\.de', r'(?:www\.)?yt\.maisputain\.ovh', r'(?:www\.)?ytprivate\.com', r'(?:www\.)?invidious\.13ad\.de', r'(?:www\.)?invidious\.toot\.koeln', r'(?:www\.)?invidious\.fdn\.fr', r'(?:www\.)?watch\.nettohikari\.com', r'(?:www\.)?invidious\.namazso\.eu', r'(?:www\.)?invidious\.silkky\.cloud', r'(?:www\.)?invidious\.exonip\.de', r'(?:www\.)?invidious\.riverside\.rocks', r'(?:www\.)?invidious\.blamefran\.net', r'(?:www\.)?invidious\.moomoo\.de', r'(?:www\.)?ytb\.trom\.tf', r'(?:www\.)?yt\.cyberhost\.uk', r'(?:www\.)?kgg2m7yk5aybusll\.onion', r'(?:www\.)?qklhadlycap4cnod\.onion', r'(?:www\.)?axqzx4s6s54s32yentfqojs3x5i7faxza6xo3ehd4bzzsg2ii4fv2iid\.onion', r'(?:www\.)?c7hqkpkpemu6e7emz5b4vyz7idjgdvgaaa3dyimmeojqbgpea3xqjoid\.onion', r'(?:www\.)?fz253lmuao3strwbfbmx46yu7acac2jz27iwtorgmbqlkurlclmancad\.onion', r'(?:www\.)?invidious\.l4qlywnpwqsluw65ts7md3khrivpirse744un3x7mlskqauz5pyuzgqd\.onion', r'(?:www\.)?owxfohz4kjyv25fvlqilyxast7inivgiktls3th44jhk3ej3i7ya\.b32\.i2p', r'(?:www\.)?4l2dgddgsrkf2ous66i6seeyi6etzfgrue332grh2n7madpwopotugyd\.onion', r'(?:www\.)?w6ijuptxiku4xpnnaetxvnkc5vqcdu7mgns2u77qefoixi63vbvnpnqd\.onion', r'(?:www\.)?kbjggqkzv65ivcqj6bumvp337z6264huv5kpkwuv6gu5yjiskvan7fad\.onion', r'(?:www\.)?grwp24hodrefzvjjuccrkw3mjq4tzhaaq32amf33dzpmuxe7ilepcmad\.onion', r'(?:www\.)?hpniueoejy4opn7bc4ftgazyqjoeqwlvh2uiku2xqku6zpoa4bf5ruid\.onion', # piped instances from https://github.com/TeamPiped/Piped/wiki/Instances r'(?:www\.)?piped\.kavin\.rocks', r'(?:www\.)?piped\.silkky\.cloud', r'(?:www\.)?piped\.tokhmi\.xyz', r'(?:www\.)?piped\.moomoo\.me', r'(?:www\.)?il\.ax', r'(?:www\.)?piped\.syncpundit\.com', r'(?:www\.)?piped\.mha\.fi', r'(?:www\.)?piped\.mint\.lgbt', r'(?:www\.)?piped\.privacy\.com\.de', ) # extracted from account/account_menu ep # XXX: These are the supported YouTube UI and API languages, # which is slightly different from languages supported for translation in YouTube studio _SUPPORTED_LANG_CODES = [ 'af', 'az', 'id', 'ms', 'bs', 'ca', 'cs', 'da', 'de', 'et', 'en-IN', 'en-GB', 'en', 'es', 'es-419', 'es-US', 'eu', 'fil', 'fr', 'fr-CA', 'gl', 'hr', 'zu', 'is', 'it', 'sw', 'lv', 'lt', 'hu', 'nl', 'no', 'uz', 'pl', 'pt-PT', 'pt', 'ro', 'sq', 'sk', 'sl', 'sr-Latn', 'fi', 'sv', 'vi', 'tr', 'be', 'bg', 'ky', 'kk', 'mk', 'mn', 'ru', 'sr', 'uk', 'el', 'hy', 'iw', 'ur', 'ar', 'fa', 'ne', 'mr', 'hi', 'as', 'bn', 'pa', 'gu', 'or', 'ta', 'te', 'kn', 'ml', 'si', 'th', 'lo', 'my', 'ka', 'am', 'km', 'zh-CN', 'zh-TW', 'zh-HK', 'ja', 'ko' ] @functools.cached_property def _preferred_lang(self): """ Returns a language code supported by YouTube for the user preferred language. Returns None if no preferred language set. """ preferred_lang = self._configuration_arg('lang', ie_key='Youtube', casesense=True, default=[''])[0] if not preferred_lang: return if preferred_lang not in self._SUPPORTED_LANG_CODES: raise ExtractorError( f'Unsupported language code: {preferred_lang}. Supported language codes (case-sensitive): {join_nonempty(*self._SUPPORTED_LANG_CODES, delim=", ")}.', expected=True) elif preferred_lang != 'en': self.report_warning( f'Preferring "{preferred_lang}" translated fields. Note that some metadata extraction may fail or be incorrect.') return preferred_lang def _initialize_consent(self): cookies = self._get_cookies('https://www.youtube.com/') if cookies.get('__Secure-3PSID'): return consent_id = None consent = cookies.get('CONSENT') if consent: if 'YES' in consent.value: return consent_id = self._search_regex( r'PENDING\+(\d+)', consent.value, 'consent', default=None) if not consent_id: consent_id = random.randint(100, 999) self._set_cookie('.youtube.com', 'CONSENT', 'YES+cb.20210328-17-p0.en+FX+%s' % consent_id) def _initialize_pref(self): cookies = self._get_cookies('https://www.youtube.com/') pref_cookie = cookies.get('PREF') pref = {} if pref_cookie: try: pref = dict(urllib.parse.parse_qsl(pref_cookie.value)) except ValueError: self.report_warning('Failed to parse user PREF cookie' + bug_reports_message()) pref.update({'hl': self._preferred_lang or 'en', 'tz': 'UTC'}) self._set_cookie('.youtube.com', name='PREF', value=urllib.parse.urlencode(pref)) def _real_initialize(self): self._initialize_pref() self._initialize_consent() self._check_login_required() def _check_login_required(self): if self._LOGIN_REQUIRED and not self._cookies_passed: self.raise_login_required('Login details are needed to download this content', method='cookies') _YT_INITIAL_DATA_RE = r'(?:window\s*\[\s*["\']ytInitialData["\']\s*\]|ytInitialData)\s*=' _YT_INITIAL_PLAYER_RESPONSE_RE = r'ytInitialPlayerResponse\s*=' def _get_default_ytcfg(self, client='web'): return copy.deepcopy(INNERTUBE_CLIENTS[client]) def _get_innertube_host(self, client='web'): return INNERTUBE_CLIENTS[client]['INNERTUBE_HOST'] def _ytcfg_get_safe(self, ytcfg, getter, expected_type=None, default_client='web'): # try_get but with fallback to default ytcfg client values when present _func = lambda y: try_get(y, getter, expected_type) return _func(ytcfg) or _func(self._get_default_ytcfg(default_client)) def _extract_client_name(self, ytcfg, default_client='web'): return self._ytcfg_get_safe( ytcfg, (lambda x: x['INNERTUBE_CLIENT_NAME'], lambda x: x['INNERTUBE_CONTEXT']['client']['clientName']), str, default_client) def _extract_client_version(self, ytcfg, default_client='web'): return self._ytcfg_get_safe( ytcfg, (lambda x: x['INNERTUBE_CLIENT_VERSION'], lambda x: x['INNERTUBE_CONTEXT']['client']['clientVersion']), str, default_client) def _select_api_hostname(self, req_api_hostname, default_client=None): return (self._configuration_arg('innertube_host', [''], ie_key=YoutubeIE.ie_key())[0] or req_api_hostname or self._get_innertube_host(default_client or 'web')) def _extract_api_key(self, ytcfg=None, default_client='web'): return self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_API_KEY'], str, default_client) def _extract_context(self, ytcfg=None, default_client='web'): context = get_first( (ytcfg, self._get_default_ytcfg(default_client)), 'INNERTUBE_CONTEXT', expected_type=dict) # Enforce language and tz for extraction client_context = traverse_obj(context, 'client', expected_type=dict, default={}) client_context.update({'hl': self._preferred_lang or 'en', 'timeZone': 'UTC', 'utcOffsetMinutes': 0}) return context _SAPISID = None def _generate_sapisidhash_header(self, origin='https://www.youtube.com'): time_now = round(time.time()) if self._SAPISID is None: yt_cookies = self._get_cookies('https://www.youtube.com') # Sometimes SAPISID cookie isn't present but __Secure-3PAPISID is. # See: https://github.com/yt-dlp/yt-dlp/issues/393 sapisid_cookie = dict_get( yt_cookies, ('__Secure-3PAPISID', 'SAPISID')) if sapisid_cookie and sapisid_cookie.value: self._SAPISID = sapisid_cookie.value self.write_debug('Extracted SAPISID cookie') # SAPISID cookie is required if not already present if not yt_cookies.get('SAPISID'): self.write_debug('Copying __Secure-3PAPISID cookie to SAPISID cookie') self._set_cookie( '.youtube.com', 'SAPISID', self._SAPISID, secure=True, expire_time=time_now + 3600) else: self._SAPISID = False if not self._SAPISID: return None # SAPISIDHASH algorithm from https://stackoverflow.com/a/32065323 sapisidhash = hashlib.sha1( f'{time_now} {self._SAPISID} {origin}'.encode()).hexdigest() return f'SAPISIDHASH {time_now}_{sapisidhash}' def _call_api(self, ep, query, video_id, fatal=True, headers=None, note='Downloading API JSON', errnote='Unable to download API page', context=None, api_key=None, api_hostname=None, default_client='web'): data = {'context': context} if context else {'context': self._extract_context(default_client=default_client)} data.update(query) real_headers = self.generate_api_headers(default_client=default_client) real_headers.update({'content-type': 'application/json'}) if headers: real_headers.update(headers) api_key = (self._configuration_arg('innertube_key', [''], ie_key=YoutubeIE.ie_key(), casesense=True)[0] or api_key or self._extract_api_key(default_client=default_client)) return self._download_json( f'https://{self._select_api_hostname(api_hostname, default_client)}/youtubei/v1/{ep}', video_id=video_id, fatal=fatal, note=note, errnote=errnote, data=json.dumps(data).encode('utf8'), headers=real_headers, query={'key': api_key, 'prettyPrint': 'false'}) def extract_yt_initial_data(self, item_id, webpage, fatal=True): return self._search_json(self._YT_INITIAL_DATA_RE, webpage, 'yt initial data', item_id, fatal=fatal) @staticmethod def _extract_session_index(*data): """ Index of current account in account list. See: https://github.com/yt-dlp/yt-dlp/pull/519 """ for ytcfg in data: session_index = int_or_none(try_get(ytcfg, lambda x: x['SESSION_INDEX'])) if session_index is not None: return session_index # Deprecated? def _extract_identity_token(self, ytcfg=None, webpage=None): if ytcfg: token = try_get(ytcfg, lambda x: x['ID_TOKEN'], str) if token: return token if webpage: return self._search_regex( r'\bID_TOKEN["\']\s*:\s*["\'](.+?)["\']', webpage, 'identity token', default=None, fatal=False) @staticmethod def _extract_account_syncid(*args): """ Extract syncId required to download private playlists of secondary channels @params response and/or ytcfg """ for data in args: # ytcfg includes channel_syncid if on secondary channel delegated_sid = try_get(data, lambda x: x['DELEGATED_SESSION_ID'], str) if delegated_sid: return delegated_sid sync_ids = (try_get( data, (lambda x: x['responseContext']['mainAppWebResponseContext']['datasyncId'], lambda x: x['DATASYNC_ID']), str) or '').split('||') if len(sync_ids) >= 2 and sync_ids[1]: # datasyncid is of the form "channel_syncid||user_syncid" for secondary channel # and just "user_syncid||" for primary channel. We only want the channel_syncid return sync_ids[0] @staticmethod def _extract_visitor_data(*args): """ Extracts visitorData from an API response or ytcfg Appears to be used to track session state """ return get_first( args, [('VISITOR_DATA', ('INNERTUBE_CONTEXT', 'client', 'visitorData'), ('responseContext', 'visitorData'))], expected_type=str) @functools.cached_property def is_authenticated(self): return bool(self._generate_sapisidhash_header()) def extract_ytcfg(self, video_id, webpage): if not webpage: return {} return self._parse_json( self._search_regex( r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg', default='{}'), video_id, fatal=False) or {} def generate_api_headers( self, *, ytcfg=None, account_syncid=None, session_index=None, visitor_data=None, identity_token=None, api_hostname=None, default_client='web'): origin = 'https://' + (self._select_api_hostname(api_hostname, default_client)) headers = { 'X-YouTube-Client-Name': str( self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT_CLIENT_NAME'], default_client=default_client)), 'X-YouTube-Client-Version': self._extract_client_version(ytcfg, default_client), 'Origin': origin, 'X-Youtube-Identity-Token': identity_token or self._extract_identity_token(ytcfg), 'X-Goog-PageId': account_syncid or self._extract_account_syncid(ytcfg), 'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg), 'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client) } if session_index is None: session_index = self._extract_session_index(ytcfg) if account_syncid or session_index is not None: headers['X-Goog-AuthUser'] = session_index if session_index is not None else 0 auth = self._generate_sapisidhash_header(origin) if auth is not None: headers['Authorization'] = auth headers['X-Origin'] = origin return {h: v for h, v in headers.items() if v is not None} def _download_ytcfg(self, client, video_id): url = { 'web': 'https://www.youtube.com', 'web_music': 'https://music.youtube.com', 'web_embedded': f'https://www.youtube.com/embed/{video_id}?html5=1' }.get(client) if not url: return {} webpage = self._download_webpage( url, video_id, fatal=False, note=f'Downloading {client.replace("_", " ").strip()} client config') return self.extract_ytcfg(video_id, webpage) or {} @staticmethod def _build_api_continuation_query(continuation, ctp=None): query = { 'continuation': continuation } # TODO: Inconsistency with clickTrackingParams. # Currently we have a fixed ctp contained within context (from ytcfg) # and a ctp in root query for continuation. if ctp: query['clickTracking'] = {'clickTrackingParams': ctp} return query @classmethod def _extract_next_continuation_data(cls, renderer): next_continuation = try_get( renderer, (lambda x: x['continuations'][0]['nextContinuationData'], lambda x: x['continuation']['reloadContinuationData']), dict) if not next_continuation: return continuation = next_continuation.get('continuation') if not continuation: return ctp = next_continuation.get('clickTrackingParams') return cls._build_api_continuation_query(continuation, ctp) @classmethod def _extract_continuation_ep_data(cls, continuation_ep: dict): if isinstance(continuation_ep, dict): continuation = try_get( continuation_ep, lambda x: x['continuationCommand']['token'], str) if not continuation: return ctp = continuation_ep.get('clickTrackingParams') return cls._build_api_continuation_query(continuation, ctp) @classmethod def _extract_continuation(cls, renderer): next_continuation = cls._extract_next_continuation_data(renderer) if next_continuation: return next_continuation contents = [] for key in ('contents', 'items', 'rows'): contents.extend(try_get(renderer, lambda x: x[key], list) or []) for content in contents: if not isinstance(content, dict): continue continuation_ep = try_get( content, (lambda x: x['continuationItemRenderer']['continuationEndpoint'], lambda x: x['continuationItemRenderer']['button']['buttonRenderer']['command']), dict) continuation = cls._extract_continuation_ep_data(continuation_ep) if continuation: return continuation @classmethod def _extract_alerts(cls, data): for alert_dict in try_get(data, lambda x: x['alerts'], list) or []: if not isinstance(alert_dict, dict): continue for alert in alert_dict.values(): alert_type = alert.get('type') if not alert_type: continue message = cls._get_text(alert, 'text') if message: yield alert_type, message def _report_alerts(self, alerts, expected=True, fatal=True, only_once=False): errors = [] warnings = [] for alert_type, alert_message in alerts: if alert_type.lower() == 'error' and fatal: errors.append([alert_type, alert_message]) else: warnings.append([alert_type, alert_message]) for alert_type, alert_message in (warnings + errors[:-1]): self.report_warning(f'YouTube said: {alert_type} - {alert_message}', only_once=only_once) if errors: raise ExtractorError('YouTube said: %s' % errors[-1][1], expected=expected) def _extract_and_report_alerts(self, data, *args, **kwargs): return self._report_alerts(self._extract_alerts(data), *args, **kwargs) def _extract_badges(self, renderer: dict): privacy_icon_map = { 'PRIVACY_UNLISTED': BadgeType.AVAILABILITY_UNLISTED, 'PRIVACY_PRIVATE': BadgeType.AVAILABILITY_PRIVATE, 'PRIVACY_PUBLIC': BadgeType.AVAILABILITY_PUBLIC } badge_style_map = { 'BADGE_STYLE_TYPE_MEMBERS_ONLY': BadgeType.AVAILABILITY_SUBSCRIPTION, 'BADGE_STYLE_TYPE_PREMIUM': BadgeType.AVAILABILITY_PREMIUM, 'BADGE_STYLE_TYPE_LIVE_NOW': BadgeType.LIVE_NOW } label_map = { 'unlisted': BadgeType.AVAILABILITY_UNLISTED, 'private': BadgeType.AVAILABILITY_PRIVATE, 'members only': BadgeType.AVAILABILITY_SUBSCRIPTION, 'live': BadgeType.LIVE_NOW, 'premium': BadgeType.AVAILABILITY_PREMIUM } badges = [] for badge in traverse_obj(renderer, ('badges', ..., 'metadataBadgeRenderer'), default=[]): badge_type = ( privacy_icon_map.get(traverse_obj(badge, ('icon', 'iconType'), expected_type=str)) or badge_style_map.get(traverse_obj(badge, 'style')) ) if badge_type: badges.append({'type': badge_type}) continue # fallback, won't work in some languages label = traverse_obj(badge, 'label', expected_type=str, default='') for match, label_badge_type in label_map.items(): if match in label.lower(): badges.append({'type': badge_type}) continue return badges @staticmethod def _has_badge(badges, badge_type): return bool(traverse_obj(badges, lambda _, v: v['type'] == badge_type)) @staticmethod def _get_text(data, *path_list, max_runs=None): for path in path_list or [None]: if path is None: obj = [data] else: obj = traverse_obj(data, path, default=[]) if not any(key is ... or isinstance(key, (list, tuple)) for key in variadic(path)): obj = [obj] for item in obj: text = try_get(item, lambda x: x['simpleText'], str) if text: return text runs = try_get(item, lambda x: x['runs'], list) or [] if not runs and isinstance(item, list): runs = item runs = runs[:min(len(runs), max_runs or len(runs))] text = ''.join(traverse_obj(runs, (..., 'text'), expected_type=str, default=[])) if text: return text def _get_count(self, data, *path_list): count_text = self._get_text(data, *path_list) or '' count = parse_count(count_text) if count is None: count = str_to_int( self._search_regex(r'^([\d,]+)', re.sub(r'\s', '', count_text), 'count', default=None)) return count @staticmethod def _extract_thumbnails(data, *path_list): """ Extract thumbnails from thumbnails dict @param path_list: path list to level that contains 'thumbnails' key """ thumbnails = [] for path in path_list or [()]: for thumbnail in traverse_obj(data, (*variadic(path), 'thumbnails', ...), default=[]): thumbnail_url = url_or_none(thumbnail.get('url')) if not thumbnail_url: continue # Sometimes youtube gives a wrong thumbnail URL. See: # https://github.com/yt-dlp/yt-dlp/issues/233 # https://github.com/ytdl-org/youtube-dl/issues/28023 if 'maxresdefault' in thumbnail_url: thumbnail_url = thumbnail_url.split('?')[0] thumbnails.append({ 'url': thumbnail_url, 'height': int_or_none(thumbnail.get('height')), 'width': int_or_none(thumbnail.get('width')), }) return thumbnails @staticmethod def extract_relative_time(relative_time_text): """ Extracts a relative time from string and converts to dt object e.g. 'streamed 6 days ago', '5 seconds ago (edited)', 'updated today' """ mobj = re.search(r'(?Ptoday|yesterday|now)|(?P