Extracted extractors.all.ytdl module

master
Felix Stupp 3 years ago
parent 9340d4171a
commit e41175f0fa
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -0,0 +1,44 @@
from __future__ import annotations
import json
import subprocess
from typing import List
from jsoncache import ApplicationCache
from ...common import call
cache = ApplicationCache(app_name="entertainment-decider-ytdl", create_cache_dir=True, default_max_age=7*86400)
cache.clean_cache()
YTDL_CALL = [
"yt-dlp",
]
class YtdlErrorException(subprocess.CalledProcessError):
pass
def ytdl_call(args: List[str]) -> dict:
proc = call(YTDL_CALL + args, check=False)
if proc.returncode != 0:
raise YtdlErrorException(
returncode=proc.returncode,
cmd=args,
output=proc.stdout,
stderr=proc.stderr,
)
return json.loads(proc.stdout.strip())
@cache.cache_json()
def get_video_info(uri: str) -> dict:
return ytdl_call([
"--no-playlist",
"--dump-json",
uri,
])
@cache.cache_json()
def get_playlist_info(uri: str) -> dict:
return ytdl_call(uri)

@ -1,54 +1,16 @@
from __future__ import annotations
import json
from datetime import datetime
import logging
import subprocess
from typing import Dict, List, Optional
from typing import Dict, Optional
from jsoncache import ApplicationCache
from ...common import call
from ...models import MediaElement
from ..all.ytdl import get_video_info, YtdlErrorException
from ..generic import AuthorExtractedData, ExtractedData, ExtractionError, SuitableLevel
from .base import MediaExtractor
cache = ApplicationCache(app_name="entertainment-decider-ytdl", create_cache_dir=True, default_max_age=7*86400)
cache.clean_cache()
YTDL_CALL = [
"yt-dlp",
]
class YtdlErrorException(subprocess.CalledProcessError):
pass
def ytdl_call(args: List[str]) -> dict:
proc = call(YTDL_CALL + args, check=False)
if proc.returncode != 0:
raise YtdlErrorException(
returncode=proc.returncode,
cmd=args,
output=proc.stdout,
stderr=proc.stderr,
)
return json.loads(proc.stdout.strip())
@cache.cache_json()
def get_video_info(uri: str) -> dict:
return ytdl_call([
"--no-playlist",
"--dump-json",
uri,
])
@cache.cache_json()
def get_playlist_info(uri: str) -> dict:
return ytdl_call(uri)
class YtdlMediaExtractor(MediaExtractor[Dict]):
def __init__(self):

Loading…
Cancel
Save