Compare commits

...

14 Commits

@ -1,35 +1,58 @@
#!/usr/bin/env python3
import argparse
import os
from pathlib import Path
from string import Template
import subprocess
from typing import Callable, Dict, Optional
from typing import (
Any,
Callable,
Dict,
Optional,
TypeAlias,
)
import urllib.parse as url
def cmd_player_play(video_uri: str, start: Optional[str] = None, speed: Optional[str] = None):
def cmd_player_play(
video_uri: str,
start: Optional[str] = None,
speed: Optional[str] = None,
) -> None:
print(f"Play video {video_uri}")
mpv_cmd = ["/usr/bin/env", "mpv"]
mpvadd_cmd = Path("~/bin/mpvctl").expanduser()
if mpvadd_cmd.exists():
mpv_cmd = [str(mpvadd_cmd)]
subprocess.Popen(
args = [e for e in [
str(Path("~/bin/mpvctl").expanduser()),
"add",
video_uri,
#f"start={start}" if start is not None else None + "," + f"speed={speed}" if speed is not None else None,
] if e is not None],
stdin = subprocess.DEVNULL,
stdout = subprocess.DEVNULL,
stderr = subprocess.DEVNULL,
args=[
e
for e in [
*mpv_cmd,
"add",
video_uri,
# f"start={start}" if start is not None else None + "," + f"speed={speed}" if speed is not None else None,
]
if e is not None
],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
URI_SCHEME = "entertainment-decider"
URI_COMMANDS = {
CommandDict: TypeAlias = Dict[str, "CommandType"]
CommandType: TypeAlias = CommandDict | Callable[..., None]
URI_COMMANDS: CommandDict = {
"player": {
"play": cmd_player_play
}
"play": cmd_player_play,
},
}
def execute_uri_command(uri: str):
def execute_uri_command(uri: str) -> Any:
parts = url.urlparse(uri, scheme=URI_SCHEME, allow_fragments=False)
if parts.scheme != URI_SCHEME:
if parts.scheme in {"http", "https"}:
@ -37,33 +60,40 @@ def execute_uri_command(uri: str):
raise Exception(f"Cannot parse URI's with scheme {parts.scheme!r}")
path = parts.path.strip("/").split("/")
options = dict(url.parse_qsl(parts.query))
def unknown_cmd():
def unknown_cmd() -> None:
raise Exception(f"Unknown command {parts.path}")
current = URI_COMMANDS
current: Any = URI_COMMANDS
for path_name in path:
if callable(current) or path_name not in current:
unknown_cmd()
return unknown_cmd()
current = current[path_name]
if not callable(current):
unknown_cmd()
return unknown_cmd()
return current(**options)
def misc_generate_desktop():
with Path("./entry.desktop").open("r") as fh:
def misc_generate_desktop() -> None:
template_path = os.getenv("STREAMLINED_DESKTOP_TEMPLATE") or "./entry.desktop"
with Path(template_path).open("r") as fh:
temp = Template(fh.read())
print(temp.substitute(name="Entertainment Decider", exec_path=str(Path(__file__).resolve())))
exec_path = os.getenv("STREAMLINED_EXEC_PATH") or str(Path(__file__).resolve())
print(temp.substitute(name="Entertainment Decider", exec_path=exec_path))
MISC_COMMANDS: Dict[str, Callable] = {
MISC_COMMANDS: Dict[str, Callable[..., None]] = {
"generate-desktop-file": misc_generate_desktop,
}
def execute_misc_cmd(cmd: str):
def execute_misc_cmd(cmd: str) -> None:
if cmd not in MISC_COMMANDS:
raise Exception(f"Unknown misc command {cmd!r}")
return MISC_COMMANDS[cmd]()
def main():
def main() -> None:
parser = argparse.ArgumentParser(prog="entertainment-decider")
subparsers = parser.add_subparsers()
# uri parser
@ -81,5 +111,6 @@ def main():
del d["parser_cmd"]
cmd(**d)
if __name__ == "__main__":
main()

@ -0,0 +1,4 @@
let
nixpkgs = import <nixpkgs> { };
in
nixpkgs.callPackage ./package.nix { }

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1751011381,
"narHash": "sha256-krGXKxvkBhnrSC/kGBmg5MyupUUT5R6IBCLEzx9jhMM=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "30e2e2857ba47844aa71991daa6ed1fc678bcbb7",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

@ -0,0 +1,27 @@
{
description = "Streamlined Desktop Client";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs =
{ nixpkgs, ... }:
let
inherit (nixpkgs) lib;
inherit (builtins) attrNames;
inherit (lib.attrsets) genAttrs;
architectures = attrNames nixpkgs.legacyPackages;
forAllSystems = genAttrs architectures;
in
{
packages = forAllSystems (
system:
let
pkgs = import nixpkgs { inherit system; };
in
rec {
default = streamlined-client;
streamlined-client = pkgs.callPackage ./package.nix { };
}
);
};
}

@ -0,0 +1,51 @@
{
lib,
writeText,
python3Packages,
mypy,
...
}:
let
name = "streamlined-client";
version = "2025.06.29";
project_toml = writeText "${name}_pyproject" ''
[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "${name}"
version = "${version}"
requires-python = ">= 3.11"
[project.scripts]
${name} = "streamlined.client.app:main"
'';
in
python3Packages.buildPythonPackage {
inherit name version;
format = "pyproject";
build-system = lib.singleton python3Packages.setuptools;
dependencies = with python3Packages; [
setuptools
];
unpackPhase = ''
cp ${project_toml} ./pyproject.toml
mkdir --parent ./src/streamlined/client
touch ./src/streamlined{,/client}/__init__.py
cp -r ${./app.py} ./src/streamlined/client/app.py
chmod --recursive u=rwX ./src # required so further build steps can create wrapper files
${lib.getExe mypy} --strict ./src
'';
postInstall = ''
mkdir --parent $out/share/applications
STREAMLINED_DESKTOP_TEMPLATE=${./entry.desktop} STREAMLINED_EXEC_PATH=$out/bin/${name} $out/bin/${name} misc generate-desktop-file > $out/share/applications/${name}_uri.desktop
'';
meta = {
description = "Streamlined Desktop Client";
mainProgram = name;
};
}

@ -1,12 +1,17 @@
from __future__ import annotations
from collections.abc import (
Collection,
)
from datetime import datetime
import logging
import re
from typing import Dict, TypeAlias
from typing import (
TypedDict,
)
from pony import orm # TODO remove
import youtubesearchpython
from yt_dlp import YoutubeDL
from ...models import MediaCollection
from ..all.youtube import (
@ -22,10 +27,33 @@ from ..generic import (
from .base import CollectionExtractor
DataType: TypeAlias = Dict
YTDLP_OPTS = {
"extract_flat": "in_playlist",
}
class PlaylistChannel(TypedDict):
name: str
id: str
class PlaylistMetadata(TypedDict):
id: str
title: str
channel: PlaylistChannel
link: str
class PlaylistVideo(TypedDict):
id: str
class YouTubeCollectionExtractor(CollectionExtractor[DataType]):
class PlaylistData(TypedDict):
info: PlaylistMetadata
videos: Collection[PlaylistVideo]
class YouTubeCollectionExtractor(CollectionExtractor[PlaylistData]):
__uri_regex = re.compile(
r"""^
https?://
@ -87,50 +115,39 @@ class YouTubeCollectionExtractor(CollectionExtractor[DataType]):
last_release_date
)
def _extract_offline(self, uri: str) -> ExtractedDataOffline[DataType]:
def _extract_offline(self, uri: str) -> ExtractedDataOffline[PlaylistData]:
playlist_id = self.__convert_if_required(self.__get_id(uri))
return ExtractedDataOffline[DataType](
return ExtractedDataOffline[PlaylistData](
extractor_name=self.name,
object_key=playlist_id,
object_uri=uri,
)
def _extract_online(self, uri: str) -> ExtractedDataOnline[DataType]:
def _extract_online(self, uri: str) -> ExtractedDataOnline[PlaylistData]:
orig_id = self.__get_id(uri)
playlist_id = self.__convert_if_required(orig_id)
playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}"
is_channel = self.__is_channel_id(playlist_id)
logging.info(f"Request Youtube playlist {playlist_link!r}")
playlist = youtubesearchpython.Playlist(playlist_link)
try:
while playlist.hasMoreVideos:
playlist.getNextVideos()
except Exception as e:
# TODO improve check of Exception kind if possible
if is_channel and "invalid status code" in str(e.args[0]).lower():
# Partial Update on channels can be accepted because newest videos are at the top
logging.warning(
f"Failed to retrieve channel completely, proceed with partial update"
)
else:
raise e
with YoutubeDL(YTDLP_OPTS) as ydl:
info = ydl.extract_info(
playlist_link,
download=False,
)
playlist = self.__adapt_ytdlp_format(ydl.sanitize_info(info))
logging.debug(
f"Retrieved {len(playlist.videos)} videos from playlist {playlist_link!r}"
f"Retrieved {len(playlist['videos'])} videos from playlist {playlist_link!r}"
)
return ExtractedDataOnline[DataType](
return ExtractedDataOnline[PlaylistData](
extractor_name=self.name,
object_key=playlist_id,
object_uri=uri,
data={
"info": playlist.info["info"],
"videos": playlist.videos,
},
data=playlist,
)
def _update_object_raw(
self,
object: MediaCollection,
data: DataType,
data: PlaylistData,
) -> ChangedReport:
info = data["info"]
is_channel = self.__is_channel_id(info["id"])
@ -170,3 +187,23 @@ class YouTubeCollectionExtractor(CollectionExtractor[DataType]):
)
)
return ChangedReport.ChangedSome # TODO improve
@staticmethod
def __adapt_ytdlp_format(ytdlp_info) -> PlaylistData:
return {
"info": {
"id": ytdlp_info["id"],
"title": ytdlp_info["title"],
"channel": {
"id": ytdlp_info["channel_id"],
"name": ytdlp_info["channel"],
},
"link": ytdlp_info["webpage_url"],
},
"videos": [
{
"id": elem["id"],
}
for elem in ytdlp_info["entries"]
],
}

@ -5,11 +5,11 @@ import logging
import re
from typing import Optional
from youtubesearchpython import ( # type: ignore
ResultMode,
Video,
)
from yt_dlp import YoutubeDL # type: ignore
from ...extras import (
multi_strptime,
)
from ...models import (
MediaElement,
MediaThumbnail,
@ -31,6 +31,11 @@ from ..generic import (
from .base import MediaExtractor
YTDLP_OPTS = {
"check_formats": False,
}
class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
__uri_regex = re.compile(
r"""^
@ -78,10 +83,12 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
raise Exception(f"URI not suitable: {uri!r}")
id = uri_match.group("id")
try:
vid_data: YoutubeVideoData = Video.getInfo(
videoLink=f"https://www.youtube.com/watch?v={id}",
mode=ResultMode.dict,
)
with YoutubeDL(YTDLP_OPTS) as ydl:
info = ydl.extract_info(
f"https://www.youtube.com/watch?v={id}",
download=False,
)
vid_data = self.__adapt_ytdlp_format(ydl.sanitize_info(info))
except Exception as e:
raise ExtractionError() from e
if vid_data["isLiveNow"]:
@ -106,8 +113,12 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
key=lambda thumb: thumbnail_sort_key(thumb["width"], thumb["height"]),
)
object.thumbnail = MediaThumbnail.from_uri(best_thumb["url"])
object.release_date = datetime.strptime(
data.get("uploadDate") or data["publishDate"], "%Y-%m-%d"
object.release_date = multi_strptime(
data.get("uploadDate") or data["publishDate"],
"%Y-%m-%dT%H:%M:%S%:z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d",
"%Y%m%d",
)
object.length = int(data["duration"]["secondsText"])
for tag in get_video_tags(data):
@ -120,3 +131,35 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
)
)
return ChangedReport.ChangedSome # TODO improve
@staticmethod
def __adapt_ytdlp_format(ytdlp_info) -> YoutubeVideoData:
return {
"id": ytdlp_info["id"],
"title": ytdlp_info["title"],
# TODO keep as int
"duration": {"secondsText": str(ytdlp_info["duration"])},
"viewCount": {"text": str(ytdlp_info["view_count"])},
"thumbnails": [
{
"url": thumb["url"],
"height": thumb["height"],
"width": thumb["width"],
}
for thumb in ytdlp_info["thumbnails"]
if "width" in thumb and "height" in thumb
],
"description": ytdlp_info["description"],
"channel": {
"name": ytdlp_info["channel"],
"id": ytdlp_info["channel_id"],
"link": ytdlp_info["channel_url"],
},
"allowRatings": False, # faked, unknown, unimportant, TODO remove
"averageRating": ytdlp_info["average_rating"],
"keywords": ytdlp_info["tags"],
"isLiveContent": ytdlp_info["was_live"],
"uploadDate": ytdlp_info["upload_date"],
"isLiveNow": ytdlp_info["is_live"],
"link": ytdlp_info["webpage_url"],
}

@ -1,4 +1,7 @@
from .chain import Chain
from .datetimes import (
multi_strptime,
)
from .errors import gen_api_error
from .strings import remove_common_trails
from .typing import LazyValue
@ -8,5 +11,6 @@ __all__ = [
"Chain",
"LazyValue",
"gen_api_error",
"multi_strptime",
"remove_common_trails",
]

@ -0,0 +1,24 @@
from __future__ import annotations
from datetime import (
datetime,
)
from logging import (
INFO,
Logger,
)
log = Logger(__name__)
def multi_strptime(text: str, *format: str, log_level: int = INFO) -> datetime:
log.debug(f"try multi_strptime on {text!r} with {format!r}")
for fmt in format:
try:
return datetime.strptime(text, fmt)
except:
log.log(
level=log_level,
msg=f"failed multi_strptime on {text!r} with {fmt!r}, continue",
)
raise ValueError(f"failed all multi_strptime on {text!r} with {format!r}")

@ -10,7 +10,6 @@ rss-parser>=1.1
tmdbsimple>=2.9.1
yt-dlp>=2022.6.29
git+https://git.banananet.work/zocker/python-jsoncache#egg=jsoncache
git+https://github.com/Zocker1999NET/youtube-search-python@my-version#egg=youtube-search-python
git+https://github.com/Zocker1999NET/tinytinypy#egg=tinytinypy
# replacement for jinja until https://github.com/pallets/jinja/pull/1712 is merged
git+https://github.com/Zocker1999NET/jinja@trailing-comma-3.1.2#egg=Jinja2

Loading…
Cancel
Save