Reformat code (partially using black)

master
Felix Stupp 3 years ago
parent 4e6e646c87
commit bd72ad77bb
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -1,5 +1,5 @@
#### ####
## Imports # Imports
#### ####
from __future__ import annotations from __future__ import annotations
@ -11,7 +11,16 @@ import logging
import os import os
import random import random
from urllib.parse import urlencode, quote_plus from urllib.parse import urlencode, quote_plus
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Union from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Union,
)
from flask import Flask, jsonify, make_response, request, redirect from flask import Flask, jsonify, make_response, request, redirect
from flask.templating import render_template from flask.templating import render_template
@ -20,35 +29,48 @@ from pony.flask import Pony
from pony import orm from pony import orm
from entertainment_decider import common from entertainment_decider import common
from entertainment_decider.models import PreferenceScore, Tag, db, MediaCollection, MediaCollectionLink, MediaElement, generate_preference_list from entertainment_decider.models import (
from entertainment_decider.extractors.collection import collection_extract_uri, collection_update PreferenceScore,
Tag,
db,
MediaCollection,
MediaCollectionLink,
MediaElement,
generate_preference_list,
)
from entertainment_decider.extractors.collection import (
collection_extract_uri,
collection_update,
)
from entertainment_decider.extractors.media import media_extract_uri from entertainment_decider.extractors.media import media_extract_uri
#### ####
## Logging Config # Logging Config
#### ####
logging.basicConfig(format = "%(asctime)s === %(message)s", level=logging.DEBUG) logging.basicConfig(format="%(asctime)s === %(message)s", level=logging.DEBUG)
#### ####
## Flask Config # Flask Config
#### ####
flask_app = Flask(__name__) flask_app = Flask(__name__)
flask_app.config.update(dict( flask_app.config.update(
CELERY = dict( dict(
), CELERY=dict(),
DEBUG = True, DEBUG=True,
PONY = dict( PONY=dict(
provider = "sqlite", provider="sqlite",
filename = "./db.sqlite", filename="./db.sqlite",
create_db = True, create_db=True,
) )
)) )
)
def environ_bool(value: Union[str, bool]) -> bool: def environ_bool(value: Union[str, bool]) -> bool:
if type(value) == bool: if type(value) == bool:
@ -60,33 +82,43 @@ ConfigSingleTranslator = Callable[[Any], Any]
ConfigTranslatorIterable = Iterable[ConfigSingleTranslator] ConfigTranslatorIterable = Iterable[ConfigSingleTranslator]
ConfigTranslatorCreator = Callable[[str], ConfigTranslatorIterable] ConfigTranslatorCreator = Callable[[str], ConfigTranslatorIterable]
def config_suffixer(setter: ConfigKeySetter, prefix: str, lower: bool = True) -> ConfigTranslatorCreator:
def config_suffixer(
setter: ConfigKeySetter, prefix: str, lower: bool = True
) -> ConfigTranslatorCreator:
def creator(key: str): def creator(key: str):
if not key.startswith(prefix): if not key.startswith(prefix):
raise Exception(f"Environment key {key!r} is missing suffix {prefix!r}") raise Exception(f"Environment key {key!r} is missing suffix {prefix!r}")
new_key = key[len(prefix):] new_key = key[len(prefix) :]
new_key = new_key.lower() if lower else new_key new_key = new_key.lower() if lower else new_key
return ( return (partial(setter, new_key),)
partial(setter, new_key)
)
return creator return creator
def celery_config_setter(key: str, val: Any): def celery_config_setter(key: str, val: Any):
flask_app.config["CELERY"][key] = val flask_app.config["CELERY"][key] = val
celery_config_same = config_suffixer(celery_config_setter, "CELERY_") celery_config_same = config_suffixer(celery_config_setter, "CELERY_")
def flask_config_setter(key: str, val: Any): def flask_config_setter(key: str, val: Any):
flask_app.config[key] = val flask_app.config[key] = val
flask_config_same = config_suffixer(flask_config_setter, "FLASK_", lower=False) flask_config_same = config_suffixer(flask_config_setter, "FLASK_", lower=False)
def pony_config_setter(key: str, val: Any): def pony_config_setter(key: str, val: Any):
flask_app.config["PONY"][key] = val flask_app.config["PONY"][key] = val
pony_config_same = config_suffixer(pony_config_setter, "PONY_") pony_config_same = config_suffixer(pony_config_setter, "PONY_")
CONFIG_TRANSLATE_TABLE: Dict[str, Union[ConfigTranslatorIterable, ConfigTranslatorCreator]] = { CONFIG_TRANSLATE_TABLE: Dict[
str, Union[ConfigTranslatorIterable, ConfigTranslatorCreator]
] = {
"CELERY_BROKER_URL": celery_config_same, "CELERY_BROKER_URL": celery_config_same,
"CELERY_RESULT_BACKEND": celery_config_same, "CELERY_RESULT_BACKEND": celery_config_same,
"FLASK_DEBUG": ( "FLASK_DEBUG": (
@ -120,7 +152,7 @@ for key, val in os.environ.items():
#### ####
## Pony init # Pony init
#### ####
@ -131,7 +163,7 @@ Pony(flask_app)
#### ####
## Return filters # Return filters
#### ####
@ -144,14 +176,18 @@ def as_link(uri: str):
uri = Markup.escape(uri) uri = Markup.escape(uri)
return Markup(f'<a href="{uri}">{uri}</a>') return Markup(f'<a href="{uri}">{uri}</a>')
@flask_app.template_filter() @flask_app.template_filter()
def tenary(b: bool, true_str: str, false_str: str) -> str: def tenary(b: bool, true_str: str, false_str: str) -> str:
return true_str if b else false_str return true_str if b else false_str
TIMEDELTA_FORMAT = ( TIMEDELTA_FORMAT = (
datetime.timedelta(hours=1), datetime.timedelta(hours=1),
datetime.timedelta(minutes=1), datetime.timedelta(minutes=1),
) )
@flask_app.template_filter() @flask_app.template_filter()
def timedelta(seconds: int) -> str: def timedelta(seconds: int) -> str:
delta = datetime.timedelta(seconds=seconds) delta = datetime.timedelta(seconds=seconds)
@ -169,7 +205,7 @@ def timedelta(seconds: int) -> str:
#### ####
## Routes # Routes
#### ####
@ -179,7 +215,9 @@ def dashboard():
pinned_limit = 10 pinned_limit = 10
media_limit = 10 media_limit = 10
# for links from pinned collections # for links from pinned collections
pinned_collections: Iterable[MediaCollection] = orm.select(m for m in MediaCollection if m.pinned and not m.ignored).order_by(MediaCollection.release_date, MediaCollection.title, MediaCollection.id) pinned_collections: Iterable[MediaCollection] = orm.select(
m for m in MediaCollection if m.pinned and not m.ignored
).order_by(MediaCollection.release_date, MediaCollection.title, MediaCollection.id)
links_from_pinned_collections: List[MediaCollectionLink] = list() links_from_pinned_collections: List[MediaCollectionLink] = list()
episodes_from_pinned_collections: Set[MediaElement] = set() episodes_from_pinned_collections: Set[MediaElement] = set()
for coll in pinned_collections: for coll in pinned_collections:
@ -224,9 +262,16 @@ def list_collections_with_unwatched():
@flask_app.route("/collection/pinned") @flask_app.route("/collection/pinned")
def list_pinned_collection(): def list_pinned_collection():
collection_list: Iterable[MediaCollection] = orm.select(c for c in MediaCollection if c.pinned).order_by(orm.desc(MediaCollection.release_date), MediaCollection.title, MediaCollection.id) collection_list: Iterable[MediaCollection] = orm.select(
c for c in MediaCollection if c.pinned
).order_by(
orm.desc(MediaCollection.release_date),
MediaCollection.title,
MediaCollection.id,
)
return render_template("collection_list.htm", collection_list=collection_list) return render_template("collection_list.htm", collection_list=collection_list)
@flask_app.route("/collection/<int:collection_id>") @flask_app.route("/collection/<int:collection_id>")
def show_collection(collection_id): def show_collection(collection_id):
collection: MediaCollection = MediaCollection.get(id=collection_id) collection: MediaCollection = MediaCollection.get(id=collection_id)
@ -238,6 +283,7 @@ def show_collection(collection_id):
media_links=MediaCollectionLink.sorted(MediaCollectionLink.select(lambda l: l.collection == collection)) if orm.count(collection.media_links) <= 100 else None, media_links=MediaCollectionLink.sorted(MediaCollectionLink.select(lambda l: l.collection == collection)) if orm.count(collection.media_links) <= 100 else None,
) )
@flask_app.route("/collection/<int:collection_id>/episodes") @flask_app.route("/collection/<int:collection_id>/episodes")
def show_collection_episodes(collection_id): def show_collection_episodes(collection_id):
collection: MediaCollection = MediaCollection.get(id=collection_id) collection: MediaCollection = MediaCollection.get(id=collection_id)
@ -262,9 +308,10 @@ def list_media():
media_list=list(itertools.islice(get_considerable(), 100)) media_list=list(itertools.islice(get_considerable(), 100))
) )
@flask_app.route("/media/short") @flask_app.route("/media/short")
@flask_app.route("/media/short/<int:seconds>") @flask_app.route("/media/short/<int:seconds>")
def list_short_media(seconds: int = 10*60): def list_short_media(seconds: int = 10 * 60):
media_list: Iterable[MediaElement] = orm.select(m for m in MediaElement).order_by(orm.desc(MediaElement.release_date), MediaElement.id) media_list: Iterable[MediaElement] = orm.select(m for m in MediaElement).order_by(orm.desc(MediaElement.release_date), MediaElement.id)
def get_considerable(): def get_considerable():
for element in media_list: for element in media_list:
@ -277,16 +324,20 @@ def list_short_media(seconds: int = 10*60):
@flask_app.route("/media/unsorted") @flask_app.route("/media/unsorted")
def list_unsorted_media(): def list_unsorted_media():
media_list: Iterable[MediaElement] = orm.select(m for m in MediaElement if len(m.collection_links) == 0).order_by(orm.desc(MediaElement.release_date), MediaElement.id) media_list: Iterable[MediaElement] = orm.select(
m for m in MediaElement if len(m.collection_links) == 0
).order_by(orm.desc(MediaElement.release_date), MediaElement.id)
return render_template( return render_template(
"media_list.htm", "media_list.htm",
media_list=media_list, media_list=media_list,
) )
@flask_app.route("/media/extract") @flask_app.route("/media/extract")
def extract_media(): def extract_media():
return render_template("media_extract.htm") return render_template("media_extract.htm")
@flask_app.route("/media/<int:media_id>") @flask_app.route("/media/<int:media_id>")
def show_media(media_id): def show_media(media_id):
element: MediaElement = MediaElement.get(id=media_id) element: MediaElement = MediaElement.get(id=media_id)
@ -341,6 +392,7 @@ def refresh_collections():
collection_update(coll) collection_update(coll)
return redirect_back_or_okay() return redirect_back_or_okay()
@flask_app.route("/api/refresh/collection/<int:collection_id>", methods=["POST"]) @flask_app.route("/api/refresh/collection/<int:collection_id>", methods=["POST"])
def force_refresh_collection(collection_id: int): def force_refresh_collection(collection_id: int):
coll: MediaCollection = MediaCollection.get(id=collection_id) coll: MediaCollection = MediaCollection.get(id=collection_id)
@ -360,15 +412,28 @@ def show_stats():
"known": orm.count(elements), "known": orm.count(elements),
"known_seconds": orm.sum(m.length for m in elements), "known_seconds": orm.sum(m.length for m in elements),
"watched": orm.count(m for m in elements if m.watched), "watched": orm.count(m for m in elements if m.watched),
"watched_seconds": orm.sum((m.length if m.watched else m.progress) for m in elements if m.watched), "watched_seconds": orm.sum(
(m.length if m.watched else m.progress)
for m in elements
if m.watched
),
"ignored": orm.count(m for m in elements if m.ignored), "ignored": orm.count(m for m in elements if m.ignored),
"ignored_seconds": orm.sum(m.length - m.progress for m in elements if m.ignored), "ignored_seconds": orm.sum(
"to_watch": orm.count(m for m in elements if not m.ignored and not m.watched), m.length - m.progress for m in elements if m.ignored
"to_watch_seconds": orm.sum(m.length - m.progress for m in elements if not m.ignored and not m.watched) ),
} "to_watch": orm.count(
} m for m in elements if not m.ignored and not m.watched
),
"to_watch_seconds": orm.sum(
m.length - m.progress
for m in elements
if not m.ignored and not m.watched
),
},
},
) )
@flask_app.route("/tag") @flask_app.route("/tag")
def show_tag(): def show_tag():
tag_list: List[Tag] = Tag.select() tag_list: List[Tag] = Tag.select()
@ -385,6 +450,7 @@ def test():
"data": first.to_dict(), "data": first.to_dict(),
}, 200 }, 200
# TODO add table for failed attempts so these may be resolved afterwards with increasing delays (add to MediaElement with flag "retrieved" and "last_updated" as date to resolve last try) # TODO add table for failed attempts so these may be resolved afterwards with increasing delays (add to MediaElement with flag "retrieved" and "last_updated" as date to resolve last try)
@ -398,20 +464,25 @@ def redirect_back_or_okay():
return "400 Bad Request : Invalid Redirect Specified", 400 return "400 Bad Request : Invalid Redirect Specified", 400
return redirect(uri) return redirect(uri)
@flask_app.route("/api/collection/list") @flask_app.route("/api/collection/list")
def api_collection_list(): def api_collection_list():
collection_list: Iterable[MediaCollection] = MediaCollection.select() collection_list: Iterable[MediaCollection] = MediaCollection.select()
return { return {
"status": True, "status": True,
"data": [{ "data": [
{
"id": collection.id, "id": collection.id,
"title": collection.title, "title": collection.title,
"release_date": collection.release_date, "release_date": collection.release_date,
"length": collection.length, "length": collection.length,
"progress": collection.progress, "progress": collection.progress,
} for collection in collection_list], }
for collection in collection_list
],
}, 200 }, 200
@flask_app.route("/api/collection/extract", methods=["POST"]) @flask_app.route("/api/collection/extract", methods=["POST"])
def api_collection_extract(): def api_collection_extract():
data = request.form.to_dict() data = request.form.to_dict()
@ -443,15 +514,18 @@ def api_collection_element(collection_id: int):
"notes": collection.notes, "notes": collection.notes,
"release_date": collection.release_date, "release_date": collection.release_date,
"ignored": collection.ignored, "ignored": collection.ignored,
"media_links": [{ "media_links": [
{
"media": { "media": {
"id": link.element.id, "id": link.element.id,
"title": link.element.title, "title": link.element.title,
}, },
"season": link.season, "season": link.season,
"episode": link.episode, "episode": link.episode,
} for link in collection.media_links]
} }
for link in collection.media_links
],
},
}, 200 }, 200
elif request.method == "POST": elif request.method == "POST":
data = request.form.to_dict() data = request.form.to_dict()
@ -481,20 +555,45 @@ def api_collection_element(collection_id: int):
"error": "405 Method Not Allowed", "error": "405 Method Not Allowed",
}, 405 }, 405
@flask_app.route("/api/collection/<int:collection_id>", methods=["DELETE"])
@flask_app.route("/api/collection/<int:collection_id>/delete", methods=["POST"])
def api_collection_delete(collection_id: int):
collection: MediaCollection = MediaCollection.get(id=collection_id)
if collection is None:
return {
"status": False,
"error": f"Object not found",
}, 404
collection.delete()
update_element_lookup_cache([collection_id])
return redirect_back_or_okay()
@flask_app.route("/api/element_lookup_cache/update", methods=["POST"])
def api_element_lookup_cache_rebuild():
update_element_lookup_cache()
return redirect_back_or_okay()
@flask_app.route("/api/media/list") @flask_app.route("/api/media/list")
def api_media_list(): def api_media_list():
media_list: Iterable[MediaElement] = MediaElement.select() media_list: Iterable[MediaElement] = MediaElement.select()
return { return {
"status": True, "status": True,
"data": [{ "data": [
{
"id": media.id, "id": media.id,
"title": media.title, "title": media.title,
"release_date": media.release_date, "release_date": media.release_date,
"length": media.length, "length": media.length,
"progress": media.progress, "progress": media.progress,
} for media in media_list], }
for media in media_list
],
}, 200 }, 200
@flask_app.route("/api/media/extract", methods=["POST"]) @flask_app.route("/api/media/extract", methods=["POST"])
def api_media_extract(): def api_media_extract():
data = request.form.to_dict() data = request.form.to_dict()
@ -530,15 +629,18 @@ def api_media_element(media_id: int):
"ignored": element.ignored, "ignored": element.ignored,
"watched": element.watched, "watched": element.watched,
"can_considered": element.can_considered, "can_considered": element.can_considered,
"collection_links": [{ "collection_links": [
{
"collection": { "collection": {
"id": link.collection.id, "id": link.collection.id,
"title": link.collection.title, "title": link.collection.title,
}, },
"season": link.season, "season": link.season,
"episode": link.episode, "episode": link.episode,
} for link in element.collection_links]
} }
for link in element.collection_links
],
},
}, 200 }, 200
elif request.method == "POST": elif request.method == "POST":
data = request.form.to_dict() data = request.form.to_dict()

@ -48,7 +48,14 @@ class YoutubeVideoData(TypedDict):
class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]): class YoutubeMediaExtractor(MediaExtractor[YoutubeVideoData]):
__uri_regex = re.compile(r"^https?://((www\.)?youtube\.com/watch\?v=|youtu\.be/)(?P<id>[^/&?]+)") __uri_regex = re.compile(r"""^
https?://(
(www\.)?youtube\.com/(
watch\?v=
)|
youtu\.be/
)(?P<id>[^/&?]+)
$""", re.VERBOSE)
def __init__(self): def __init__(self):
super().__init__("ytdl") super().__init__("ytdl")

Loading…
Cancel
Save