mirror of https://github.com/yt-dlp/yt-dlp
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
241 lines
7.1 KiB
Python
241 lines
7.1 KiB
Python
"""No longer used and new code should not use. Exists only for API compat."""
|
|
import platform
|
|
import struct
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
import zlib
|
|
|
|
from ._utils import Popen, decode_base_n, preferredencoding
|
|
from .traversal import traverse_obj
|
|
from ..dependencies import certifi, websockets
|
|
from ..networking._helper import make_ssl_context
|
|
from ..networking._urllib import HTTPHandler
|
|
|
|
# isort: split
|
|
from .networking import random_user_agent, std_headers # noqa: F401
|
|
from ..cookies import YoutubeDLCookieJar # noqa: F401
|
|
from ..networking._urllib import PUTRequest # noqa: F401
|
|
from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest # noqa: F401
|
|
from ..networking._urllib import ProxyHandler as PerRequestProxyHandler # noqa: F401
|
|
from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler # noqa: F401
|
|
from ..networking._urllib import ( # noqa: F401
|
|
make_socks_conn_class,
|
|
update_Request,
|
|
)
|
|
from ..networking.exceptions import HTTPError, network_exceptions # noqa: F401
|
|
|
|
has_certifi = bool(certifi)
|
|
has_websockets = bool(websockets)
|
|
|
|
|
|
def load_plugins(name, suffix, namespace):
|
|
from ..plugins import load_plugins
|
|
ret = load_plugins(name, suffix)
|
|
namespace.update(ret)
|
|
return ret
|
|
|
|
|
|
def traverse_dict(dictn, keys, casesense=True):
|
|
return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True)
|
|
|
|
|
|
def decode_base(value, digits):
|
|
return decode_base_n(value, table=digits)
|
|
|
|
|
|
def platform_name():
|
|
""" Returns the platform name as a str """
|
|
return platform.platform()
|
|
|
|
|
|
def get_subprocess_encoding():
|
|
if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5:
|
|
# For subprocess calls, encode with locale encoding
|
|
# Refer to http://stackoverflow.com/a/9951851/35070
|
|
encoding = preferredencoding()
|
|
else:
|
|
encoding = sys.getfilesystemencoding()
|
|
if encoding is None:
|
|
encoding = 'utf-8'
|
|
return encoding
|
|
|
|
|
|
# UNUSED
|
|
# Based on png2str() written by @gdkchan and improved by @yokrysty
|
|
# Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706
|
|
def decode_png(png_data):
|
|
# Reference: https://www.w3.org/TR/PNG/
|
|
header = png_data[8:]
|
|
|
|
if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR':
|
|
raise OSError('Not a valid PNG file.')
|
|
|
|
int_map = {1: '>B', 2: '>H', 4: '>I'}
|
|
unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0]
|
|
|
|
chunks = []
|
|
|
|
while header:
|
|
length = unpack_integer(header[:4])
|
|
header = header[4:]
|
|
|
|
chunk_type = header[:4]
|
|
header = header[4:]
|
|
|
|
chunk_data = header[:length]
|
|
header = header[length:]
|
|
|
|
header = header[4:] # Skip CRC
|
|
|
|
chunks.append({
|
|
'type': chunk_type,
|
|
'length': length,
|
|
'data': chunk_data
|
|
})
|
|
|
|
ihdr = chunks[0]['data']
|
|
|
|
width = unpack_integer(ihdr[:4])
|
|
height = unpack_integer(ihdr[4:8])
|
|
|
|
idat = b''
|
|
|
|
for chunk in chunks:
|
|
if chunk['type'] == b'IDAT':
|
|
idat += chunk['data']
|
|
|
|
if not idat:
|
|
raise OSError('Unable to read PNG data.')
|
|
|
|
decompressed_data = bytearray(zlib.decompress(idat))
|
|
|
|
stride = width * 3
|
|
pixels = []
|
|
|
|
def _get_pixel(idx):
|
|
x = idx % stride
|
|
y = idx // stride
|
|
return pixels[y][x]
|
|
|
|
for y in range(height):
|
|
basePos = y * (1 + stride)
|
|
filter_type = decompressed_data[basePos]
|
|
|
|
current_row = []
|
|
|
|
pixels.append(current_row)
|
|
|
|
for x in range(stride):
|
|
color = decompressed_data[1 + basePos + x]
|
|
basex = y * stride + x
|
|
left = 0
|
|
up = 0
|
|
|
|
if x > 2:
|
|
left = _get_pixel(basex - 3)
|
|
if y > 0:
|
|
up = _get_pixel(basex - stride)
|
|
|
|
if filter_type == 1: # Sub
|
|
color = (color + left) & 0xff
|
|
elif filter_type == 2: # Up
|
|
color = (color + up) & 0xff
|
|
elif filter_type == 3: # Average
|
|
color = (color + ((left + up) >> 1)) & 0xff
|
|
elif filter_type == 4: # Paeth
|
|
a = left
|
|
b = up
|
|
c = 0
|
|
|
|
if x > 2 and y > 0:
|
|
c = _get_pixel(basex - stride - 3)
|
|
|
|
p = a + b - c
|
|
|
|
pa = abs(p - a)
|
|
pb = abs(p - b)
|
|
pc = abs(p - c)
|
|
|
|
if pa <= pb and pa <= pc:
|
|
color = (color + a) & 0xff
|
|
elif pb <= pc:
|
|
color = (color + b) & 0xff
|
|
else:
|
|
color = (color + c) & 0xff
|
|
|
|
current_row.append(color)
|
|
|
|
return width, height, pixels
|
|
|
|
|
|
def register_socks_protocols():
|
|
# "Register" SOCKS protocols
|
|
# In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904
|
|
# URLs with protocols not in urlparse.uses_netloc are not handled correctly
|
|
for scheme in ('socks', 'socks4', 'socks4a', 'socks5'):
|
|
if scheme not in urllib.parse.uses_netloc:
|
|
urllib.parse.uses_netloc.append(scheme)
|
|
|
|
|
|
def handle_youtubedl_headers(headers):
|
|
filtered_headers = headers
|
|
|
|
if 'Youtubedl-no-compression' in filtered_headers:
|
|
filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'}
|
|
del filtered_headers['Youtubedl-no-compression']
|
|
|
|
return filtered_headers
|
|
|
|
|
|
def request_to_url(req):
|
|
if isinstance(req, urllib.request.Request):
|
|
return req.get_full_url()
|
|
else:
|
|
return req
|
|
|
|
|
|
def sanitized_Request(url, *args, **kwargs):
|
|
from ..utils import escape_url, extract_basic_auth, sanitize_url
|
|
url, auth_header = extract_basic_auth(escape_url(sanitize_url(url)))
|
|
if auth_header is not None:
|
|
headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {})
|
|
headers['Authorization'] = auth_header
|
|
return urllib.request.Request(url, *args, **kwargs)
|
|
|
|
|
|
class YoutubeDLHandler(HTTPHandler):
|
|
def __init__(self, params, *args, **kwargs):
|
|
self._params = params
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
YoutubeDLHTTPSHandler = YoutubeDLHandler
|
|
|
|
|
|
class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor):
|
|
def __init__(self, cookiejar=None):
|
|
urllib.request.HTTPCookieProcessor.__init__(self, cookiejar)
|
|
|
|
def http_response(self, request, response):
|
|
return urllib.request.HTTPCookieProcessor.http_response(self, request, response)
|
|
|
|
https_request = urllib.request.HTTPCookieProcessor.http_request
|
|
https_response = http_response
|
|
|
|
|
|
def make_HTTPS_handler(params, **kwargs):
|
|
return YoutubeDLHTTPSHandler(params, context=make_ssl_context(
|
|
verify=not params.get('nocheckcertificate'),
|
|
client_certificate=params.get('client_certificate'),
|
|
client_certificate_key=params.get('client_certificate_key'),
|
|
client_certificate_password=params.get('client_certificate_password'),
|
|
legacy_support=params.get('legacyserverconnect'),
|
|
use_certifi='no-certifi' not in params.get('compat_opts', []),
|
|
), **kwargs)
|
|
|
|
|
|
def process_communicate_or_kill(p, *args, **kwargs):
|
|
return Popen.communicate_or_kill(p, *args, **kwargs)
|