diff --git a/yt_dlp/extractor/cda.py b/yt_dlp/extractor/cda.py index 6d01c60d5..2a12b054b 100644 --- a/yt_dlp/extractor/cda.py +++ b/yt_dlp/extractor/cda.py @@ -1,4 +1,8 @@ +import base64 import codecs +import datetime +import hashlib +import hmac import json import re @@ -12,6 +16,8 @@ from ..utils import ( multipart_encode, parse_duration, random_birthday, + traverse_obj, + try_call, try_get, urljoin, ) @@ -19,7 +25,18 @@ from ..utils import ( class CDAIE(InfoExtractor): _VALID_URL = r'https?://(?:(?:www\.)?cda\.pl/video|ebd\.cda\.pl/[0-9]+x[0-9]+)/(?P[0-9a-z]+)' + _NETRC_MACHINE = 'cdapl' + _BASE_URL = 'http://www.cda.pl/' + _BASE_API_URL = 'https://api.cda.pl' + _API_HEADERS = { + 'Accept': 'application/vnd.cda.public+json', + 'User-Agent': 'pl.cda 1.0 (version 1.2.88 build 15306; Android 9; Xiaomi Redmi 3S)', + } + # hardcoded in the app + _LOGIN_REQUEST_AUTH = 'Basic YzU3YzBlZDUtYTIzOC00MWQwLWI2NjQtNmZmMWMxY2Y2YzVlOklBTm95QlhRRVR6U09MV1hnV3MwMW0xT2VyNWJNZzV4clRNTXhpNGZJUGVGZ0lWUlo5UGVYTDhtUGZaR1U1U3Q' + _BEARER_CACHE = 'cda-bearer' + _TESTS = [{ 'url': 'http://www.cda.pl/video/5749950c', 'md5': '6f844bf51b15f31fae165365707ae970', @@ -83,8 +100,73 @@ class CDAIE(InfoExtractor): 'Content-Type': content_type, }, **kwargs) + def _perform_login(self, username, password): + cached_bearer = self.cache.load(self._BEARER_CACHE, username) or {} + if cached_bearer.get('valid_until', 0) > datetime.datetime.now().timestamp() + 5: + self._API_HEADERS['Authorization'] = f'Bearer {cached_bearer["token"]}' + return + + password_hash = base64.urlsafe_b64encode(hmac.new( + b's01m1Oer5IANoyBXQETzSOLWXgWs01m1Oer5bMg5xrTMMxRZ9Pi4fIPeFgIVRZ9PeXL8mPfXQETZGUAN5StRZ9P', + ''.join(f'{bytes((bt & 255, )).hex():0>2}' + for bt in hashlib.md5(password.encode()).digest()).encode(), + hashlib.sha256).digest()).decode().replace('=', '') + + token_res = self._download_json( + f'{self._BASE_API_URL}/oauth/token', None, 'Logging in', data=b'', + headers={**self._API_HEADERS, 'Authorization': self._LOGIN_REQUEST_AUTH}, + query={ + 'grant_type': 'password', + 'login': username, + 'password': password_hash, + }) + self.cache.store(self._BEARER_CACHE, username, { + 'token': token_res['access_token'], + 'valid_until': token_res['expires_in'] + datetime.datetime.now().timestamp(), + }) + self._API_HEADERS['Authorization'] = f'Bearer {token_res["access_token"]}' + def _real_extract(self, url): video_id = self._match_id(url) + + if 'Authorization' in self._API_HEADERS: + return self._api_extract(video_id) + else: + return self._web_extract(video_id, url) + + def _api_extract(self, video_id): + meta = self._download_json( + f'{self._BASE_API_URL}/video/{video_id}', video_id, headers=self._API_HEADERS)['video'] + + if meta.get('premium') and not meta.get('premium_free'): + self.report_drm(video_id) + + uploader = traverse_obj(meta, 'author', 'login') + + formats = [{ + 'url': quality['file'], + 'format': quality.get('title'), + 'resolution': quality.get('name'), + 'height': try_call(lambda: int(quality['name'][:-1])), + 'filesize': quality.get('length'), + } for quality in meta['qualities'] if quality.get('file')] + + self._sort_formats(formats) + + return { + 'id': video_id, + 'title': meta.get('title'), + 'description': meta.get('description'), + 'uploader': None if uploader == 'anonim' else uploader, + 'average_rating': float_or_none(meta.get('rating')), + 'thumbnail': meta.get('thumb'), + 'formats': formats, + 'duration': meta.get('duration'), + 'age_limit': 18 if meta.get('for_adults') else 0, + 'view_count': meta.get('views'), + } + + def _web_extract(self, video_id, url): self._set_cookie('cda.pl', 'cda.player', 'html5') webpage = self._download_webpage( self._BASE_URL + '/video/' + video_id, video_id)