"""No longer used and new code should not use. Exists only for API compat.""" import platform import struct import sys import urllib.error import urllib.parse import urllib.request import zlib from ._utils import Popen, decode_base_n, preferredencoding from .networking import escape_rfc3986 # noqa: F401 from .networking import normalize_url as escape_url # noqa: F401 from .traversal import traverse_obj from ..dependencies import certifi, websockets from ..networking._helper import make_ssl_context from ..networking._urllib import HTTPHandler # isort: split from .networking import random_user_agent, std_headers # noqa: F401 from ..cookies import YoutubeDLCookieJar # noqa: F401 from ..networking._urllib import PUTRequest # noqa: F401 from ..networking._urllib import SUPPORTED_ENCODINGS, HEADRequest # noqa: F401 from ..networking._urllib import ProxyHandler as PerRequestProxyHandler # noqa: F401 from ..networking._urllib import RedirectHandler as YoutubeDLRedirectHandler # noqa: F401 from ..networking._urllib import ( # noqa: F401 make_socks_conn_class, update_Request, ) from ..networking.exceptions import HTTPError, network_exceptions # noqa: F401 has_certifi = bool(certifi) has_websockets = bool(websockets) def load_plugins(name, suffix, namespace): from ..plugins import load_plugins ret = load_plugins(name, suffix) namespace.update(ret) return ret def traverse_dict(dictn, keys, casesense=True): return traverse_obj(dictn, keys, casesense=casesense, is_user_input=True, traverse_string=True) def decode_base(value, digits): return decode_base_n(value, table=digits) def platform_name(): """ Returns the platform name as a str """ return platform.platform() def get_subprocess_encoding(): if sys.platform == 'win32' and sys.getwindowsversion()[0] >= 5: # For subprocess calls, encode with locale encoding # Refer to http://stackoverflow.com/a/9951851/35070 encoding = preferredencoding() else: encoding = sys.getfilesystemencoding() if encoding is None: encoding = 'utf-8' return encoding # UNUSED # Based on png2str() written by @gdkchan and improved by @yokrysty # Originally posted at https://github.com/ytdl-org/youtube-dl/issues/9706 def decode_png(png_data): # Reference: https://www.w3.org/TR/PNG/ header = png_data[8:] if png_data[:8] != b'\x89PNG\x0d\x0a\x1a\x0a' or header[4:8] != b'IHDR': raise OSError('Not a valid PNG file.') int_map = {1: '>B', 2: '>H', 4: '>I'} unpack_integer = lambda x: struct.unpack(int_map[len(x)], x)[0] chunks = [] while header: length = unpack_integer(header[:4]) header = header[4:] chunk_type = header[:4] header = header[4:] chunk_data = header[:length] header = header[length:] header = header[4:] # Skip CRC chunks.append({ 'type': chunk_type, 'length': length, 'data': chunk_data }) ihdr = chunks[0]['data'] width = unpack_integer(ihdr[:4]) height = unpack_integer(ihdr[4:8]) idat = b'' for chunk in chunks: if chunk['type'] == b'IDAT': idat += chunk['data'] if not idat: raise OSError('Unable to read PNG data.') decompressed_data = bytearray(zlib.decompress(idat)) stride = width * 3 pixels = [] def _get_pixel(idx): x = idx % stride y = idx // stride return pixels[y][x] for y in range(height): basePos = y * (1 + stride) filter_type = decompressed_data[basePos] current_row = [] pixels.append(current_row) for x in range(stride): color = decompressed_data[1 + basePos + x] basex = y * stride + x left = 0 up = 0 if x > 2: left = _get_pixel(basex - 3) if y > 0: up = _get_pixel(basex - stride) if filter_type == 1: # Sub color = (color + left) & 0xff elif filter_type == 2: # Up color = (color + up) & 0xff elif filter_type == 3: # Average color = (color + ((left + up) >> 1)) & 0xff elif filter_type == 4: # Paeth a = left b = up c = 0 if x > 2 and y > 0: c = _get_pixel(basex - stride - 3) p = a + b - c pa = abs(p - a) pb = abs(p - b) pc = abs(p - c) if pa <= pb and pa <= pc: color = (color + a) & 0xff elif pb <= pc: color = (color + b) & 0xff else: color = (color + c) & 0xff current_row.append(color) return width, height, pixels def register_socks_protocols(): # "Register" SOCKS protocols # In Python < 2.6.5, urlsplit() suffers from bug https://bugs.python.org/issue7904 # URLs with protocols not in urlparse.uses_netloc are not handled correctly for scheme in ('socks', 'socks4', 'socks4a', 'socks5'): if scheme not in urllib.parse.uses_netloc: urllib.parse.uses_netloc.append(scheme) def handle_youtubedl_headers(headers): filtered_headers = headers if 'Youtubedl-no-compression' in filtered_headers: filtered_headers = {k: v for k, v in filtered_headers.items() if k.lower() != 'accept-encoding'} del filtered_headers['Youtubedl-no-compression'] return filtered_headers def request_to_url(req): if isinstance(req, urllib.request.Request): return req.get_full_url() else: return req def sanitized_Request(url, *args, **kwargs): from ..utils import extract_basic_auth, sanitize_url url, auth_header = extract_basic_auth(escape_url(sanitize_url(url))) if auth_header is not None: headers = args[1] if len(args) >= 2 else kwargs.setdefault('headers', {}) headers['Authorization'] = auth_header return urllib.request.Request(url, *args, **kwargs) class YoutubeDLHandler(HTTPHandler): def __init__(self, params, *args, **kwargs): self._params = params super().__init__(*args, **kwargs) YoutubeDLHTTPSHandler = YoutubeDLHandler class YoutubeDLCookieProcessor(urllib.request.HTTPCookieProcessor): def __init__(self, cookiejar=None): urllib.request.HTTPCookieProcessor.__init__(self, cookiejar) def http_response(self, request, response): return urllib.request.HTTPCookieProcessor.http_response(self, request, response) https_request = urllib.request.HTTPCookieProcessor.http_request https_response = http_response def make_HTTPS_handler(params, **kwargs): return YoutubeDLHTTPSHandler(params, context=make_ssl_context( verify=not params.get('nocheckcertificate'), client_certificate=params.get('client_certificate'), client_certificate_key=params.get('client_certificate_key'), client_certificate_password=params.get('client_certificate_password'), legacy_support=params.get('legacyserverconnect'), use_certifi='no-certifi' not in params.get('compat_opts', []), ), **kwargs) def process_communicate_or_kill(p, *args, **kwargs): return Popen.communicate_or_kill(p, *args, **kwargs)