app.py: Rewrite & add type hints and guards for mypy

master
Felix Stupp 2 years ago
parent 487b126fe1
commit bba57e82d8
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -30,10 +30,12 @@ from flask import (
make_response,
redirect,
request,
Response,
send_file,
url_for,
)
from flask.templating import render_template
from flask.typing import ResponseReturnValue
from markupsafe import Markup
from pony.flask import Pony
from pony import orm
@ -137,16 +139,18 @@ def environ_timedelta_seconds(value: Union[int, str, timedelta]) -> int:
return environ_timedelta(value) // timedelta(seconds=1)
ConfigKeySetter: Callable[[str, Any], Any]
ConfigKeySetter = Callable[[str, Any], None]
ConfigSingleTranslator = Callable[[Any], Any]
ConfigTranslatorIterable = Iterable[ConfigSingleTranslator]
ConfigTranslatorCreator = Callable[[str], ConfigTranslatorIterable]
def config_suffixer(
setter: ConfigKeySetter, prefix: str, lower: bool = True
setter: ConfigKeySetter,
prefix: str,
lower: bool = True,
) -> ConfigTranslatorCreator:
def creator(key: str):
def creator(key: str) -> ConfigTranslatorIterable:
if not key.startswith(prefix):
raise Exception(f"Environment key {key!r} is missing suffix {prefix!r}")
new_key = key[len(prefix) :]
@ -156,21 +160,21 @@ def config_suffixer(
return creator
def celery_config_setter(key: str, val: Any):
def celery_config_setter(key: str, val: Any) -> None:
flask_app.config["CELERY"][key] = val
celery_config_same = config_suffixer(celery_config_setter, "CELERY_")
def flask_config_setter(key: str, val: Any):
def flask_config_setter(key: str, val: Any) -> None:
flask_app.config[key] = val
flask_config_same = config_suffixer(flask_config_setter, "FLASK_", lower=False)
def pony_config_setter(key: str, val: Any):
def pony_config_setter(key: str, val: Any) -> None:
flask_app.config["PONY"][key] = val
@ -238,8 +242,12 @@ def encode_options(opts: Mapping[str, Any]) -> str:
@flask_app.template_global()
def this_url(changed_args: Mapping[str, str] = {}):
view_args = dict(request.view_args)
def this_url(changed_args: Mapping[str, str] = {}) -> str:
if request.endpoint is None:
raise Exception(
"this_url can only be used after successful endpoint matching (request.endpoint is still None)"
)
view_args = dict(request.view_args or {})
get_args = request.args.to_dict()
get_args.update(changed_args)
return url_for(request.endpoint, **view_args) + (
@ -248,7 +256,7 @@ def this_url(changed_args: Mapping[str, str] = {}):
@flask_app.template_filter()
def as_link(uri: str):
def as_link(uri: str) -> Markup:
uri = Markup.escape(uri)
return Markup(f'<a href="{uri}">{uri}</a>')
@ -265,7 +273,7 @@ TIMEDELTA_FORMAT = (
@flask_app.template_filter("timedelta")
def _filter_timedelta(seconds: Optional[int]) -> str:
def _filter_timedelta(seconds: Optional[int]) -> Optional[str]:
if seconds is None:
return None
delta = timedelta(seconds=seconds)
@ -292,12 +300,12 @@ def _select_ids(cls: Type[T], ids: Iterable[int]) -> Query[T]:
@flask_app.teardown_request
def merge_query_stats(*_, **__):
def merge_query_stats(*_: Any, **__: Any) -> None:
db.merge_local_stats()
@flask_app.route("/")
def dashboard():
def dashboard() -> ResponseReturnValue:
# config
began_limit = 8
pinned_limit = 16
@ -313,18 +321,19 @@ def dashboard():
pinned_collections: Iterable[MediaCollection] = orm.select(
m for m in MediaCollection if m.pinned and not m.ignored
).order_by(MediaCollection.release_date, MediaCollection.title, MediaCollection.id)
links_from_pinned_collections = set[MediaCollectionLink]()
links_from_pinned_collections_set = set[MediaCollectionLink]()
for coll in pinned_collections:
next_link = coll.next_episode
if (
next_link is not None
and next_link not in links_from_pinned_collections
and next_link not in links_from_pinned_collections_set
and next_link.element not in already_listed
and next_link.element.can_considered
):
links_from_pinned_collections.add(next_link)
links_from_pinned_collections_set.add(next_link)
links_from_pinned_collections = sorted(
links_from_pinned_collections, key=lambda l: l.element.release_date
links_from_pinned_collections_set,
key=lambda l: l.element.release_date,
)[:pinned_limit]
already_listed.update(link.element for link in links_from_pinned_collections)
# for media
@ -341,7 +350,7 @@ def dashboard():
)
def _list_collections(collections: Iterable[MediaCollection]):
def _list_collections(collections: Iterable[MediaCollection]) -> ResponseReturnValue:
return render_template(
"collection_list.htm",
collection_list=list(collections),
@ -350,7 +359,7 @@ def _list_collections(collections: Iterable[MediaCollection]):
def _list_collections_by_filter(
filter: Callable[[MediaCollection], bool] = lambda _: True,
):
) -> ResponseReturnValue:
collection_list: Iterable[MediaCollection] = orm.select(
c for c in MediaCollection if filter(c)
).order_by(
@ -362,22 +371,22 @@ def _list_collections_by_filter(
@flask_app.route("/collection")
def list_collection():
def list_collection() -> ResponseReturnValue:
return _list_collections_by_filter(lambda coll: coll.is_root_collection)
@flask_app.route("/collection/all")
def list_collection_all():
def list_collection_all() -> ResponseReturnValue:
return _list_collections_by_filter()
@flask_app.route("/collection/extract")
def extract_collection():
def extract_collection() -> ResponseReturnValue:
return render_template("collection_extract.htm")
@flask_app.route("/collection/overview")
def list_collection_overview():
def list_collection_overview() -> ResponseReturnValue:
data = request.args.to_dict()
ids = _parse_cs_ids(data.get("ids", "NULL"))
if not ids:
@ -394,14 +403,14 @@ def list_collection_overview():
@flask_app.route("/collection/to_watch")
def list_collections_with_unwatched():
def list_collections_with_unwatched() -> ResponseReturnValue:
return _list_collections_by_filter(
lambda coll: coll.is_root_collection and not coll.ignored and not coll.completed
)
@flask_app.route("/collection/pinned")
def list_pinned_collection():
def list_pinned_collection() -> ResponseReturnValue:
collection_list: Iterable[MediaCollection] = orm.select(
c for c in MediaCollection if c.pinned
).order_by(
@ -413,7 +422,7 @@ def list_pinned_collection():
@flask_app.route("/collection/<int:collection_id>")
def show_collection(collection_id):
def show_collection(collection_id: int) -> ResponseReturnValue:
SMALL_COLLECTION_MAX_COUNT = 100
collection: MediaCollection = MediaCollection.get(id=collection_id)
if collection is None:
@ -433,7 +442,7 @@ def show_collection(collection_id):
@flask_app.route("/collection/<int:collection_id>/episodes")
def show_collection_episodes(collection_id):
def show_collection_episodes(collection_id: int) -> ResponseReturnValue:
collection: MediaCollection = MediaCollection.get(id=collection_id)
if collection is None:
return make_response(f"Not found", 404)
@ -448,7 +457,7 @@ def show_collection_episodes(collection_id):
@flask_app.route("/media")
def list_media():
def list_media() -> ResponseReturnValue:
media_list: Iterable[MediaElement] = get_all_considered(
"elem.release_date DESC, elem.id"
)
@ -460,7 +469,7 @@ def list_media():
@flask_app.route("/media/short")
@flask_app.route("/media/short/<int:seconds>")
def list_short_media(seconds: int = 10 * 60):
def list_short_media(seconds: int = 10 * 60) -> ResponseReturnValue:
media_list: Iterable[MediaElement] = get_all_considered(
filter_by=f"(length - progress) <= {seconds}",
order_by="elem.release_date DESC, elem.id",
@ -468,12 +477,13 @@ def list_short_media(seconds: int = 10 * 60):
return render_template(
"media_list.htm",
media_list=list(itertools.islice(media_list, 100)),
check_considered=False,
)
@flask_app.route("/media/long")
@flask_app.route("/media/long/<int:seconds>")
def list_long_media(seconds: int = 10 * 60):
def list_long_media(seconds: int = 10 * 60) -> ResponseReturnValue:
media_list: Iterable[MediaElement] = get_all_considered(
filter_by=f"{seconds} <= (length - progress)",
order_by="elem.release_date DESC, elem.id",
@ -485,7 +495,7 @@ def list_long_media(seconds: int = 10 * 60):
@flask_app.route("/media/unsorted")
def list_unsorted_media():
def list_unsorted_media() -> ResponseReturnValue:
media_list: Iterable[MediaElement] = orm.select(
m for m in MediaElement if len(m.collection_links) == 0
).order_by(orm.desc(MediaElement.release_date), MediaElement.id)
@ -496,12 +506,12 @@ def list_unsorted_media():
@flask_app.route("/media/extract")
def extract_media():
def extract_media() -> ResponseReturnValue:
return render_template("media_extract.htm")
@flask_app.route("/media/<int:media_id>")
def show_media(media_id):
def show_media(media_id: int) -> ResponseReturnValue:
element: MediaElement = MediaElement.get(id=media_id)
if element is None:
return make_response(f"Not found", 404)
@ -509,7 +519,7 @@ def show_media(media_id):
@flask_app.route("/media/<int:media_id>/thumbnail")
def show_media_thumb(media_id: int):
def show_media_thumb(media_id: int) -> ResponseReturnValue:
element: MediaElement = MediaElement.get(id=media_id)
if element is None:
# TODO add default thumbnail if not found
@ -529,7 +539,7 @@ def show_media_thumb(media_id: int):
@flask_app.route("/api/refresh/collections", methods=["POST"])
def refresh_collections():
def refresh_collections() -> ResponseReturnValue:
collection_ids = set[int](
orm.select(c.id for c in MediaCollection if c.keep_updated)
)
@ -575,7 +585,7 @@ def refresh_collections():
@flask_app.route("/api/refresh/collection/<int:collection_id>", methods=["POST"])
def force_refresh_collection(collection_id: int):
def force_refresh_collection(collection_id: int) -> ResponseReturnValue:
coll: MediaCollection = MediaCollection.get(id=collection_id)
if coll is None:
return "404 Not Found", 404
@ -584,7 +594,7 @@ def force_refresh_collection(collection_id: int):
@flask_app.route("/api/refresh/media/<int:media_id>", methods=["POST"])
def force_refresh_media(media_id: int):
def force_refresh_media(media_id: int) -> ResponseReturnValue:
elem: MediaElement = MediaElement.get(id=media_id)
if elem is None:
return "404 Not Found", 404
@ -593,7 +603,7 @@ def force_refresh_media(media_id: int):
@flask_app.route("/stats")
def show_stats():
def show_stats() -> ResponseReturnValue:
elements: List[MediaElement] = MediaElement.select()
collections: List[MediaCollection] = MediaCollection.select()
return render_template(
@ -627,7 +637,7 @@ def show_stats():
@flask_app.route("/stats/queries")
def show_stats_queries():
def show_stats_queries() -> ResponseReturnValue:
stats = sorted(db.global_stats.values(), key=lambda s: s.sum_time, reverse=True)
return render_template(
"stats/queries.htm",
@ -636,7 +646,7 @@ def show_stats_queries():
@flask_app.route("/tag")
def show_tag():
def show_tag() -> ResponseReturnValue:
tag_list: List[Tag] = Tag.select()
return render_template(
"tag_list.htm",
@ -645,7 +655,7 @@ def show_tag():
@flask_app.route("/debug/test")
def test():
def test() -> ResponseReturnValue:
first: MediaElement = MediaElement.select().first()
return {
"data": first.to_dict(),
@ -655,7 +665,7 @@ def test():
# TODO add table for failed attempts so these may be resolved afterwards with increasing delays (add to MediaElement with flag "retrieved" and "last_updated" as date to resolve last try)
def redirect_back_or_okay():
def redirect_back_or_okay() -> Response:
if "redirect" not in request.form:
return make_response(
{
@ -664,13 +674,18 @@ def redirect_back_or_okay():
200,
)
uri = request.form.get("redirect", type=str)
if uri is None:
raise Exception(f"IllegalState: uri should be set as checked before, but isn't")
if not uri.startswith("/"):
return "400 Bad Request : Invalid Redirect Specified", 400
return make_response(
"400 Bad Request : Invalid Redirect Specified",
400,
)
return redirect(uri)
@flask_app.route("/api/collection/list")
def api_collection_list():
def api_collection_list() -> ResponseReturnValue:
collection_list: Iterable[MediaCollection] = MediaCollection.select()
return {
"status": True,
@ -688,7 +703,7 @@ def api_collection_list():
@flask_app.route("/api/collection/extract", methods=["POST"])
def api_collection_extract():
def api_collection_extract() -> ResponseReturnValue:
data = request.form.to_dict()
if "uri" not in data:
return {
@ -704,7 +719,7 @@ def api_collection_extract():
return redirect_back_or_okay()
@flask_app.route("/api/collection/<int:collection_id>", methods=["GET", "POST"])
def api_collection_element(collection_id: int):
def api_collection_element(collection_id: int) -> ResponseReturnValue:
collection: MediaCollection = MediaCollection.get(id=collection_id)
if collection is None:
return {
@ -763,7 +778,7 @@ def api_collection_element(collection_id: int):
for m in query:
m.ignored = True
del data["mark_unmarked_as"]
KEY_CONVERTER = {
KEY_CONVERTER: Mapping[str, Callable[[str], Any]] = {
"title": str,
"notes": str,
"pinned": environ_bool,
@ -791,7 +806,7 @@ def api_collection_element(collection_id: int):
@flask_app.route("/api/collection/<int:collection_id>", methods=["DELETE"])
@flask_app.route("/api/collection/<int:collection_id>/delete", methods=["POST"])
def api_collection_delete(collection_id: int):
def api_collection_delete(collection_id: int) -> ResponseReturnValue:
collection: MediaCollection = MediaCollection.get(id=collection_id)
if collection is None:
return {
@ -803,7 +818,7 @@ def api_collection_delete(collection_id: int):
@flask_app.route("/api/media/list")
def api_media_list():
def api_media_list() -> ResponseReturnValue:
media_list: Iterable[MediaElement] = MediaElement.select()
return {
"status": True,
@ -821,7 +836,7 @@ def api_media_list():
@flask_app.route("/api/media/extract", methods=["POST"])
def api_media_extract():
def api_media_extract() -> ResponseReturnValue:
data = request.form.to_dict()
if "uri" not in data:
return {
@ -835,7 +850,7 @@ def api_media_extract():
return redirect_back_or_okay()
@flask_app.route("/api/media/<int:media_id>", methods=["GET", "POST"])
def api_media_element(media_id: int):
def api_media_element(media_id: int) -> ResponseReturnValue:
element: MediaElement = MediaElement.get(id=media_id)
if element is None:
return {
@ -872,7 +887,7 @@ def api_media_element(media_id: int):
data = request.form.to_dict()
if "redirect" in data:
del data["redirect"]
KEY_CONVERTER = {
KEY_CONVERTER: Mapping[str, Callable[[str], Any]] = {
"title": str,
"notes": str,
"progress": environ_timedelta_seconds,

Loading…
Cancel
Save