Compare commits

...

14 Commits

@ -1,35 +1,58 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import os
from pathlib import Path from pathlib import Path
from string import Template from string import Template
import subprocess import subprocess
from typing import Callable, Dict, Optional from typing import (
Any,
Callable,
Dict,
Optional,
TypeAlias,
)
import urllib.parse as url import urllib.parse as url
def cmd_player_play(video_uri: str, start: Optional[str] = None, speed: Optional[str] = None): def cmd_player_play(
video_uri: str,
start: Optional[str] = None,
speed: Optional[str] = None,
) -> None:
print(f"Play video {video_uri}") print(f"Play video {video_uri}")
mpv_cmd = ["/usr/bin/env", "mpv"]
mpvadd_cmd = Path("~/bin/mpvctl").expanduser()
if mpvadd_cmd.exists():
mpv_cmd = [str(mpvadd_cmd)]
subprocess.Popen( subprocess.Popen(
args = [e for e in [ args=[
str(Path("~/bin/mpvctl").expanduser()), e
"add", for e in [
video_uri, *mpv_cmd,
#f"start={start}" if start is not None else None + "," + f"speed={speed}" if speed is not None else None, "add",
] if e is not None], video_uri,
stdin = subprocess.DEVNULL, # f"start={start}" if start is not None else None + "," + f"speed={speed}" if speed is not None else None,
stdout = subprocess.DEVNULL, ]
stderr = subprocess.DEVNULL, if e is not None
],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
) )
URI_SCHEME = "entertainment-decider" URI_SCHEME = "entertainment-decider"
URI_COMMANDS = { CommandDict: TypeAlias = Dict[str, "CommandType"]
CommandType: TypeAlias = CommandDict | Callable[..., None]
URI_COMMANDS: CommandDict = {
"player": { "player": {
"play": cmd_player_play "play": cmd_player_play,
} },
} }
def execute_uri_command(uri: str):
def execute_uri_command(uri: str) -> Any:
parts = url.urlparse(uri, scheme=URI_SCHEME, allow_fragments=False) parts = url.urlparse(uri, scheme=URI_SCHEME, allow_fragments=False)
if parts.scheme != URI_SCHEME: if parts.scheme != URI_SCHEME:
if parts.scheme in {"http", "https"}: if parts.scheme in {"http", "https"}:
@ -37,33 +60,40 @@ def execute_uri_command(uri: str):
raise Exception(f"Cannot parse URI's with scheme {parts.scheme!r}") raise Exception(f"Cannot parse URI's with scheme {parts.scheme!r}")
path = parts.path.strip("/").split("/") path = parts.path.strip("/").split("/")
options = dict(url.parse_qsl(parts.query)) options = dict(url.parse_qsl(parts.query))
def unknown_cmd():
def unknown_cmd() -> None:
raise Exception(f"Unknown command {parts.path}") raise Exception(f"Unknown command {parts.path}")
current = URI_COMMANDS
current: Any = URI_COMMANDS
for path_name in path: for path_name in path:
if callable(current) or path_name not in current: if callable(current) or path_name not in current:
unknown_cmd() return unknown_cmd()
current = current[path_name] current = current[path_name]
if not callable(current): if not callable(current):
unknown_cmd() return unknown_cmd()
return current(**options) return current(**options)
def misc_generate_desktop(): def misc_generate_desktop() -> None:
with Path("./entry.desktop").open("r") as fh: template_path = os.getenv("STREAMLINED_DESKTOP_TEMPLATE") or "./entry.desktop"
with Path(template_path).open("r") as fh:
temp = Template(fh.read()) temp = Template(fh.read())
print(temp.substitute(name="Entertainment Decider", exec_path=str(Path(__file__).resolve()))) exec_path = os.getenv("STREAMLINED_EXEC_PATH") or str(Path(__file__).resolve())
print(temp.substitute(name="Entertainment Decider", exec_path=exec_path))
MISC_COMMANDS: Dict[str, Callable] = {
MISC_COMMANDS: Dict[str, Callable[..., None]] = {
"generate-desktop-file": misc_generate_desktop, "generate-desktop-file": misc_generate_desktop,
} }
def execute_misc_cmd(cmd: str):
def execute_misc_cmd(cmd: str) -> None:
if cmd not in MISC_COMMANDS: if cmd not in MISC_COMMANDS:
raise Exception(f"Unknown misc command {cmd!r}") raise Exception(f"Unknown misc command {cmd!r}")
return MISC_COMMANDS[cmd]() return MISC_COMMANDS[cmd]()
def main():
def main() -> None:
parser = argparse.ArgumentParser(prog="entertainment-decider") parser = argparse.ArgumentParser(prog="entertainment-decider")
subparsers = parser.add_subparsers() subparsers = parser.add_subparsers()
# uri parser # uri parser
@ -81,5 +111,6 @@ def main():
del d["parser_cmd"] del d["parser_cmd"]
cmd(**d) cmd(**d)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

@ -0,0 +1,4 @@
let
nixpkgs = import <nixpkgs> { };
in
nixpkgs.callPackage ./package.nix { }

@ -0,0 +1,27 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1751011381,
"narHash": "sha256-krGXKxvkBhnrSC/kGBmg5MyupUUT5R6IBCLEzx9jhMM=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "30e2e2857ba47844aa71991daa6ed1fc678bcbb7",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

@ -0,0 +1,27 @@
{
description = "Streamlined Desktop Client";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
};
outputs =
{ nixpkgs, ... }:
let
inherit (nixpkgs) lib;
inherit (builtins) attrNames;
inherit (lib.attrsets) genAttrs;
architectures = attrNames nixpkgs.legacyPackages;
forAllSystems = genAttrs architectures;
in
{
packages = forAllSystems (
system:
let
pkgs = import nixpkgs { inherit system; };
in
rec {
default = streamlined-client;
streamlined-client = pkgs.callPackage ./package.nix { };
}
);
};
}

@ -0,0 +1,51 @@
{
lib,
writeText,
python3Packages,
mypy,
...
}:
let
name = "streamlined-client";
version = "2025.06.29";
project_toml = writeText "${name}_pyproject" ''
[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "${name}"
version = "${version}"
requires-python = ">= 3.11"
[project.scripts]
${name} = "streamlined.client.app:main"
'';
in
python3Packages.buildPythonPackage {
inherit name version;
format = "pyproject";
build-system = lib.singleton python3Packages.setuptools;
dependencies = with python3Packages; [
setuptools
];
unpackPhase = ''
cp ${project_toml} ./pyproject.toml
mkdir --parent ./src/streamlined/client
touch ./src/streamlined{,/client}/__init__.py
cp -r ${./app.py} ./src/streamlined/client/app.py
chmod --recursive u=rwX ./src # required so further build steps can create wrapper files
${lib.getExe mypy} --strict ./src
'';
postInstall = ''
mkdir --parent $out/share/applications
STREAMLINED_DESKTOP_TEMPLATE=${./entry.desktop} STREAMLINED_EXEC_PATH=$out/bin/${name} $out/bin/${name} misc generate-desktop-file > $out/share/applications/${name}_uri.desktop
'';
meta = {
description = "Streamlined Desktop Client";
mainProgram = name;
};
}

@ -1,12 +1,17 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import (
Collection,
)
from datetime import datetime from datetime import datetime
import logging import logging
import re import re
from typing import Dict, TypeAlias from typing import (
TypedDict,
)
from pony import orm # TODO remove from pony import orm # TODO remove
import youtubesearchpython from yt_dlp import YoutubeDL
from ...models import MediaCollection from ...models import MediaCollection
from ..all.youtube import ( from ..all.youtube import (
@ -22,10 +27,33 @@ from ..generic import (
from .base import CollectionExtractor from .base import CollectionExtractor
DataType: TypeAlias = Dict YTDLP_OPTS = {
"extract_flat": "in_playlist",
}
class PlaylistChannel(TypedDict):
name: str
id: str
class PlaylistMetadata(TypedDict):
id: str
title: str
channel: PlaylistChannel
link: str
class PlaylistVideo(TypedDict):
id: str
class YouTubeCollectionExtractor(CollectionExtractor[DataType]): class PlaylistData(TypedDict):
info: PlaylistMetadata
videos: Collection[PlaylistVideo]
class YouTubeCollectionExtractor(CollectionExtractor[PlaylistData]):
__uri_regex = re.compile( __uri_regex = re.compile(
r"""^ r"""^
https?:// https?://
@ -87,50 +115,39 @@ class YouTubeCollectionExtractor(CollectionExtractor[DataType]):
last_release_date last_release_date
) )
def _extract_offline(self, uri: str) -> ExtractedDataOffline[DataType]: def _extract_offline(self, uri: str) -> ExtractedDataOffline[PlaylistData]:
playlist_id = self.__convert_if_required(self.__get_id(uri)) playlist_id = self.__convert_if_required(self.__get_id(uri))
return ExtractedDataOffline[DataType]( return ExtractedDataOffline[PlaylistData](
extractor_name=self.name, extractor_name=self.name,
object_key=playlist_id, object_key=playlist_id,
object_uri=uri, object_uri=uri,
) )
def _extract_online(self, uri: str) -> ExtractedDataOnline[DataType]: def _extract_online(self, uri: str) -> ExtractedDataOnline[PlaylistData]:
orig_id = self.__get_id(uri) orig_id = self.__get_id(uri)
playlist_id = self.__convert_if_required(orig_id) playlist_id = self.__convert_if_required(orig_id)
playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}" playlist_link = f"https://www.youtube.com/playlist?list={playlist_id}"
is_channel = self.__is_channel_id(playlist_id)
logging.info(f"Request Youtube playlist {playlist_link!r}") logging.info(f"Request Youtube playlist {playlist_link!r}")
playlist = youtubesearchpython.Playlist(playlist_link) with YoutubeDL(YTDLP_OPTS) as ydl:
try: info = ydl.extract_info(
while playlist.hasMoreVideos: playlist_link,
playlist.getNextVideos() download=False,
except Exception as e: )
# TODO improve check of Exception kind if possible playlist = self.__adapt_ytdlp_format(ydl.sanitize_info(info))
if is_channel and "invalid status code" in str(e.args[0]).lower():
# Partial Update on channels can be accepted because newest videos are at the top
logging.warning(
f"Failed to retrieve channel completely, proceed with partial update"
)
else:
raise e
logging.debug( logging.debug(
f"Retrieved {len(playlist.videos)} videos from playlist {playlist_link!r}" f"Retrieved {len(playlist['videos'])} videos from playlist {playlist_link!r}"
) )
return ExtractedDataOnline[DataType]( return ExtractedDataOnline[PlaylistData](
extractor_name=self.name, extractor_name=self.name,
object_key=playlist_id, object_key=playlist_id,
object_uri=uri, object_uri=uri,
data={ data=playlist,
"info": playlist.info["info"],
"videos": playlist.videos,
},
) )
def _update_object_raw( def _update_object_raw(
self, self,
object: MediaCollection, object: MediaCollection,
data: DataType, data: PlaylistData,
) -> ChangedReport: ) -> ChangedReport:
info = data["info"] info = data["info"]
is_channel = self.__is_channel_id(info["id"]) is_channel = self.__is_channel_id(info["id"])
@ -170,3 +187,23 @@ class YouTubeCollectionExtractor(CollectionExtractor[DataType]):
) )
) )
return ChangedReport.ChangedSome # TODO improve return ChangedReport.ChangedSome # TODO improve
@staticmethod
def __adapt_ytdlp_format(ytdlp_info) -> PlaylistData:
return {
"info": {
"id": ytdlp_info["id"],
"title": ytdlp_info["title"],
"channel": {
"id": ytdlp_info["channel_id"],
"name": ytdlp_info["channel"],
},
"link": ytdlp_info["webpage_url"],
},
"videos": [
{
"id": elem["id"],
}
for elem in ytdlp_info["entries"]
],
}

@ -5,11 +5,11 @@ import logging
import re import re
from typing import Optional from typing import Optional
from youtubesearchpython import ( # type: ignore from yt_dlp import YoutubeDL # type: ignore
ResultMode,
Video,
)
from ...extras import (
multi_strptime,
)
from ...models import ( from ...models import (
MediaElement, MediaElement,
MediaThumbnail, MediaThumbnail,
@ -31,6 +31,11 @@ from ..generic import (
from .base import MediaExtractor from .base import MediaExtractor
YTDLP_OPTS = {
"check_formats": False,
}
class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]): class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
__uri_regex = re.compile( __uri_regex = re.compile(
r"""^ r"""^
@ -78,10 +83,12 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
raise Exception(f"URI not suitable: {uri!r}") raise Exception(f"URI not suitable: {uri!r}")
id = uri_match.group("id") id = uri_match.group("id")
try: try:
vid_data: YoutubeVideoData = Video.getInfo( with YoutubeDL(YTDLP_OPTS) as ydl:
videoLink=f"https://www.youtube.com/watch?v={id}", info = ydl.extract_info(
mode=ResultMode.dict, f"https://www.youtube.com/watch?v={id}",
) download=False,
)
vid_data = self.__adapt_ytdlp_format(ydl.sanitize_info(info))
except Exception as e: except Exception as e:
raise ExtractionError() from e raise ExtractionError() from e
if vid_data["isLiveNow"]: if vid_data["isLiveNow"]:
@ -106,8 +113,12 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
key=lambda thumb: thumbnail_sort_key(thumb["width"], thumb["height"]), key=lambda thumb: thumbnail_sort_key(thumb["width"], thumb["height"]),
) )
object.thumbnail = MediaThumbnail.from_uri(best_thumb["url"]) object.thumbnail = MediaThumbnail.from_uri(best_thumb["url"])
object.release_date = datetime.strptime( object.release_date = multi_strptime(
data.get("uploadDate") or data["publishDate"], "%Y-%m-%d" data.get("uploadDate") or data["publishDate"],
"%Y-%m-%dT%H:%M:%S%:z",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%d",
"%Y%m%d",
) )
object.length = int(data["duration"]["secondsText"]) object.length = int(data["duration"]["secondsText"])
for tag in get_video_tags(data): for tag in get_video_tags(data):
@ -120,3 +131,35 @@ class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
) )
) )
return ChangedReport.ChangedSome # TODO improve return ChangedReport.ChangedSome # TODO improve
@staticmethod
def __adapt_ytdlp_format(ytdlp_info) -> YoutubeVideoData:
return {
"id": ytdlp_info["id"],
"title": ytdlp_info["title"],
# TODO keep as int
"duration": {"secondsText": str(ytdlp_info["duration"])},
"viewCount": {"text": str(ytdlp_info["view_count"])},
"thumbnails": [
{
"url": thumb["url"],
"height": thumb["height"],
"width": thumb["width"],
}
for thumb in ytdlp_info["thumbnails"]
if "width" in thumb and "height" in thumb
],
"description": ytdlp_info["description"],
"channel": {
"name": ytdlp_info["channel"],
"id": ytdlp_info["channel_id"],
"link": ytdlp_info["channel_url"],
},
"allowRatings": False, # faked, unknown, unimportant, TODO remove
"averageRating": ytdlp_info["average_rating"],
"keywords": ytdlp_info["tags"],
"isLiveContent": ytdlp_info["was_live"],
"uploadDate": ytdlp_info["upload_date"],
"isLiveNow": ytdlp_info["is_live"],
"link": ytdlp_info["webpage_url"],
}

@ -1,4 +1,7 @@
from .chain import Chain from .chain import Chain
from .datetimes import (
multi_strptime,
)
from .errors import gen_api_error from .errors import gen_api_error
from .strings import remove_common_trails from .strings import remove_common_trails
from .typing import LazyValue from .typing import LazyValue
@ -8,5 +11,6 @@ __all__ = [
"Chain", "Chain",
"LazyValue", "LazyValue",
"gen_api_error", "gen_api_error",
"multi_strptime",
"remove_common_trails", "remove_common_trails",
] ]

@ -0,0 +1,24 @@
from __future__ import annotations
from datetime import (
datetime,
)
from logging import (
INFO,
Logger,
)
log = Logger(__name__)
def multi_strptime(text: str, *format: str, log_level: int = INFO) -> datetime:
log.debug(f"try multi_strptime on {text!r} with {format!r}")
for fmt in format:
try:
return datetime.strptime(text, fmt)
except:
log.log(
level=log_level,
msg=f"failed multi_strptime on {text!r} with {fmt!r}, continue",
)
raise ValueError(f"failed all multi_strptime on {text!r} with {format!r}")

@ -10,7 +10,6 @@ rss-parser>=1.1
tmdbsimple>=2.9.1 tmdbsimple>=2.9.1
yt-dlp>=2022.6.29 yt-dlp>=2022.6.29
git+https://git.banananet.work/zocker/python-jsoncache#egg=jsoncache git+https://git.banananet.work/zocker/python-jsoncache#egg=jsoncache
git+https://github.com/Zocker1999NET/youtube-search-python@my-version#egg=youtube-search-python
git+https://github.com/Zocker1999NET/tinytinypy#egg=tinytinypy git+https://github.com/Zocker1999NET/tinytinypy#egg=tinytinypy
# replacement for jinja until https://github.com/pallets/jinja/pull/1712 is merged # replacement for jinja until https://github.com/pallets/jinja/pull/1712 is merged
git+https://github.com/Zocker1999NET/jinja@trailing-comma-3.1.2#egg=Jinja2 git+https://github.com/Zocker1999NET/jinja@trailing-comma-3.1.2#egg=Jinja2

Loading…
Cancel
Save