[hls,aes] Fallback to native implementation for AES-CBC

and detect `Cryptodome` in addition to `Crypto`

Closes #935
Related: #938
pull/1484/head
pukkandan 3 years ago
parent 7303f84abe
commit edf65256aa
No known key found for this signature in database
GPG Key ID: 0F00D95A001F4698

@ -2,8 +2,8 @@ import unittest
from datetime import datetime, timezone
from yt_dlp import cookies
from yt_dlp.compat import compat_pycrypto_AES
from yt_dlp.cookies import (
CRYPTO_AVAILABLE,
LinuxChromeCookieDecryptor,
MacChromeCookieDecryptor,
WindowsChromeCookieDecryptor,
@ -53,7 +53,7 @@ class TestCookies(unittest.TestCase):
decryptor = LinuxChromeCookieDecryptor('Chrome', YDLLogger())
self.assertEqual(decryptor.decrypt(encrypted_value), value)
@unittest.skipIf(not CRYPTO_AVAILABLE, 'cryptography library not available')
@unittest.skipIf(not compat_pycrypto_AES, 'cryptography library not available')
def test_chrome_cookie_decryptor_windows_v10(self):
with MonkeyPatch(cookies, {
'_get_windows_v10_key': lambda *args, **kwargs: b'Y\xef\xad\xad\xeerp\xf0Y\xe6\x9b\x12\xc2<z\x16]\n\xbb\xb8\xcb\xd7\x9bA\xc3\x14e\x99{\xd6\xf4&'

@ -35,6 +35,7 @@ from .compat import (
compat_kwargs,
compat_numeric_types,
compat_os_name,
compat_pycrypto_AES,
compat_shlex_quote,
compat_str,
compat_tokenize_tokenize,
@ -3295,13 +3296,12 @@ class YoutubeDL(object):
) or 'none'
self._write_string('[debug] exe versions: %s\n' % exe_str)
from .downloader.fragment import can_decrypt_frag
from .downloader.websocket import has_websockets
from .postprocessor.embedthumbnail import has_mutagen
from .cookies import SQLITE_AVAILABLE, KEYRING_AVAILABLE
lib_str = ', '.join(sorted(filter(None, (
can_decrypt_frag and 'pycryptodome',
compat_pycrypto_AES and compat_pycrypto_AES.__name__.split('.')[0],
has_websockets and 'websockets',
has_mutagen and 'mutagen',
SQLITE_AVAILABLE and 'sqlite',

@ -2,9 +2,21 @@ from __future__ import unicode_literals
from math import ceil
from .compat import compat_b64decode
from .compat import compat_b64decode, compat_pycrypto_AES
from .utils import bytes_to_intlist, intlist_to_bytes
if compat_pycrypto_AES:
def aes_cbc_decrypt_bytes(data, key, iv):
""" Decrypt bytes with AES-CBC using pycryptodome """
return compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_CBC, iv).decrypt(data)
else:
def aes_cbc_decrypt_bytes(data, key, iv):
""" Decrypt bytes with AES-CBC using native implementation since pycryptodome is unavailable """
return intlist_to_bytes(aes_cbc_decrypt(*map(bytes_to_intlist, (data, key, iv))))
BLOCK_SIZE_BYTES = 16

@ -148,6 +148,15 @@ else:
compat_expanduser = os.path.expanduser
try:
from Cryptodome.Cipher import AES as compat_pycrypto_AES
except ImportError:
try:
from Crypto.Cipher import AES as compat_pycrypto_AES
except ImportError:
compat_pycrypto_AES = None
# Deprecated
compat_basestring = str
@ -241,6 +250,7 @@ __all__ = [
'compat_os_name',
'compat_parse_qs',
'compat_print',
'compat_pycrypto_AES',
'compat_realpath',
'compat_setenv',
'compat_shlex_quote',

@ -13,6 +13,7 @@ from yt_dlp.aes import aes_cbc_decrypt
from yt_dlp.compat import (
compat_b64decode,
compat_cookiejar_Cookie,
compat_pycrypto_AES
)
from yt_dlp.utils import (
bug_reports_message,
@ -32,12 +33,6 @@ except ImportError:
SQLITE_AVAILABLE = False
try:
from Crypto.Cipher import AES
CRYPTO_AVAILABLE = True
except ImportError:
CRYPTO_AVAILABLE = False
try:
import keyring
KEYRING_AVAILABLE = True
@ -400,7 +395,7 @@ class WindowsChromeCookieDecryptor(ChromeCookieDecryptor):
if self._v10_key is None:
self._logger.warning('cannot decrypt v10 cookies: no key found', only_once=True)
return None
elif not CRYPTO_AVAILABLE:
elif not compat_pycrypto_AES:
self._logger.warning('cannot decrypt cookie as the `pycryptodome` module is not installed. '
'Please install by running `python3 -m pip install pycryptodome`',
only_once=True)
@ -660,7 +655,7 @@ def _decrypt_aes_cbc(ciphertext, key, logger, initialization_vector=b' ' * 16):
def _decrypt_aes_gcm(ciphertext, key, nonce, authentication_tag, logger):
cipher = AES.new(key, AES.MODE_GCM, nonce)
cipher = compat_pycrypto_AES.new(key, compat_pycrypto_AES.MODE_GCM, nonce)
try:
plaintext = cipher.decrypt_and_verify(ciphertext, authentication_tag)
except ValueError:

@ -6,13 +6,8 @@ import subprocess
import sys
import time
try:
from Crypto.Cipher import AES
can_decrypt_frag = True
except ImportError:
can_decrypt_frag = False
from .common import FileDownloader
from ..aes import aes_cbc_decrypt_bytes
from ..compat import (
compat_setenv,
compat_str,
@ -164,8 +159,7 @@ class ExternalFD(FileDownloader):
decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
encrypted_data = src.read()
decrypted_data = AES.new(
decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(encrypted_data)
decrypted_data = aes_cbc_decrypt_bytes(encrypted_data, decrypt_info['KEY'], iv)
dest.write(decrypted_data)
else:
fragment_data = src.read()

@ -4,12 +4,6 @@ import os
import time
import json
try:
from Crypto.Cipher import AES
can_decrypt_frag = True
except ImportError:
can_decrypt_frag = False
try:
import concurrent.futures
can_threaded_download = True
@ -18,6 +12,7 @@ except ImportError:
from .common import FileDownloader
from .http import HttpFD
from ..aes import aes_cbc_decrypt_bytes
from ..compat import (
compat_urllib_error,
compat_struct_pack,
@ -386,7 +381,7 @@ class FragmentFD(FileDownloader):
# not what it decrypts to.
if self.params.get('test', False):
return frag_content
return AES.new(decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
return aes_cbc_decrypt_bytes(frag_content, decrypt_info['KEY'], iv)
def append_fragment(frag_content, frag_index, ctx):
if not frag_content:

@ -5,7 +5,7 @@ import io
import binascii
from ..downloader import get_suitable_downloader
from .fragment import FragmentFD, can_decrypt_frag
from .fragment import FragmentFD
from .external import FFmpegFD
from ..compat import (
@ -29,7 +29,7 @@ class HlsFD(FragmentFD):
FD_NAME = 'hlsnative'
@staticmethod
def can_download(manifest, info_dict, allow_unplayable_formats=False, with_crypto=can_decrypt_frag):
def can_download(manifest, info_dict, allow_unplayable_formats=False):
UNSUPPORTED_FEATURES = [
# r'#EXT-X-BYTERANGE', # playlists composed of byte ranges of media files [2]
@ -57,7 +57,6 @@ class HlsFD(FragmentFD):
def check_results():
yield not info_dict.get('is_live')
is_aes128_enc = '#EXT-X-KEY:METHOD=AES-128' in manifest
yield with_crypto or not is_aes128_enc
yield not (is_aes128_enc and r'#EXT-X-BYTERANGE' in manifest)
for feature in UNSUPPORTED_FEATURES:
yield not re.search(feature, manifest)
@ -75,8 +74,6 @@ class HlsFD(FragmentFD):
if info_dict.get('extra_param_to_segment_url') or info_dict.get('_decryption_key_url'):
self.report_error('pycryptodome not found. Please install')
return False
if self.can_download(s, info_dict, with_crypto=True):
self.report_warning('pycryptodome is needed to download this file natively')
fd = FFmpegFD(self.ydl, self.params)
self.report_warning(
'%s detected unsupported features; extraction will be delegated to %s' % (self.FD_NAME, fd.get_basename()))

@ -3,7 +3,6 @@ from __future__ import unicode_literals
import json
import re
import sys
from .common import InfoExtractor
from ..utils import (
@ -94,20 +93,21 @@ class IviIE(InfoExtractor):
]
})
bundled = hasattr(sys, 'frozen')
for site in (353, 183):
content_data = (data % site).encode()
if site == 353:
if bundled:
continue
try:
from Cryptodome.Cipher import Blowfish
from Cryptodome.Hash import CMAC
pycryptodomex_found = True
pycryptodome_found = True
except ImportError:
pycryptodomex_found = False
continue
try:
from Crypto.Cipher import Blowfish
from Crypto.Hash import CMAC
pycryptodome_found = True
except ImportError:
pycryptodome_found = False
continue
timestamp = (self._download_json(
self._LIGHT_URL, video_id,
@ -140,14 +140,8 @@ class IviIE(InfoExtractor):
extractor_msg = 'Video %s does not exist'
elif site == 353:
continue
elif bundled:
raise ExtractorError(
'This feature does not work from bundled exe. Run yt-dlp from sources.',
expected=True)
elif not pycryptodomex_found:
raise ExtractorError(
'pycryptodomex not found. Please install',
expected=True)
elif not pycryptodome_found:
raise ExtractorError('pycryptodome not found. Please install', expected=True)
elif message:
extractor_msg += ': ' + message
raise ExtractorError(extractor_msg % video_id, expected=True)

Loading…
Cancel
Save