From b7cbcbd5403d38150a4e4c51ce096bafc1686fac Mon Sep 17 00:00:00 2001 From: Felix Stupp Date: Thu, 18 Aug 2022 10:01:38 +0000 Subject: [PATCH] Reformat models.py --- server/entertainment_decider/models.py | 88 ++++++++++++++++++-------- 1 file changed, 61 insertions(+), 27 deletions(-) diff --git a/server/entertainment_decider/models.py b/server/entertainment_decider/models.py index cea5e28..a718c7e 100644 --- a/server/entertainment_decider/models.py +++ b/server/entertainment_decider/models.py @@ -8,7 +8,17 @@ import gzip import json import math import logging -from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union +from typing import ( + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, + Union, +) import magic import requests @@ -149,7 +159,9 @@ class PreferenceScore: return PreferenceScoreAppender(self, other) def __mul__(self, scalar: float) -> PreferenceScore: - return PreferenceScore({tag: score * scalar for tag, score in self.points.items()}) + return PreferenceScore( + {tag: score * scalar for tag, score in self.points.items()} + ) def __neg__(self) -> PreferenceScore: return self * -1 @@ -158,7 +170,9 @@ class PreferenceScore: return (self & tagable.tag_hierachy.share_score(score)).calculate() def calculate_score(self, object: Tagable) -> float: - return math.fsum(self.points[tag] for tag in object.all_tags if tag in self.points) + return math.fsum( + self.points[tag] for tag in object.all_tags if tag in self.points + ) def order_by_score(self, objects: Iterable[T]) -> List[T]: return sorted(objects, key=lambda o: self.calculate_score(o)) @@ -169,9 +183,7 @@ class PreferenceScore: @classmethod def from_json(cls, data: str) -> PreferenceScore: dicts: Dict = json.loads(data) - return cls( - {Tag[id]: score for id, score in dicts.items()} - ) + return cls({Tag[id]: score for id, score in dicts.items()}) @classmethod def from_base64(cls, in_data: str, encoding: str = "utf-8") -> PreferenceScore: @@ -183,9 +195,7 @@ class PreferenceScore: return data def to_json(self) -> str: - return json.dumps( - {tag.id: score for tag, score in self.points.items()} - ) + return json.dumps({tag.id: score for tag, score in self.points.items()}) def to_base64(self, encoding: str = "utf-8") -> str: data = self.to_json() @@ -226,18 +236,22 @@ class PreferenceScoreAppender: if tag not in combined: combined[tag] = [] combined[tag].append(score) - return PreferenceScore({tag: math.fsum(scores) for tag, scores in combined.items()}) + return PreferenceScore( + {tag: math.fsum(scores) for tag, scores in combined.items()} + ) PreferenceScoreCompatibleSimple = Union[PreferenceScore, PreferenceScoreAppender] -PreferenceScoreCompatible = Union[PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple]] +PreferenceScoreCompatible = Union[ + PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple] +] def generate_preference_list( base: PreferenceScore, object_gen: Callable[[], List[MediaElement]], score_adapt: float, - limit: Optional[int] = None + limit: Optional[int] = None, ) -> List[MediaElement]: res_ids = list[int]() tag_map = dict[MediaCollection, Tag]() @@ -257,7 +271,7 @@ def generate_preference_list( res_ids.append(first_element.id) if limit is not None and limit <= len(res_ids): break - first_element.watched = True # simulative + first_element.watched = True # simulative base = base.adapt_score(first_element, score_adapt) element_list = object_gen() orm.rollback() @@ -268,11 +282,11 @@ def generate_preference_list( class CollectionStats: to_watch_count: int - ignored_count: int # but not watched + ignored_count: int # but not watched watched_count: int to_watch_seconds: int - ignored_seconds: int # but not watched + ignored_seconds: int # but not watched watched_seconds: int @property @@ -403,9 +417,11 @@ class MediaElement(db.Entity, Tagable): progress: int = orm.Required(int, default=0) length: int = orm.Optional(int) - tag_list : Iterable[Tag] = orm.Set(lambda: Tag) + tag_list: Iterable[Tag] = orm.Set(lambda: Tag) uris: Iterable[MediaUriMapping] = orm.Set(lambda: MediaUriMapping) - collection_links: Iterable[MediaCollectionLink] = orm.Set(lambda: MediaCollectionLink) + collection_links: Iterable[MediaCollectionLink] = orm.Set( + lambda: MediaCollectionLink + ) @property def was_extracted(self) -> bool: @@ -417,7 +433,14 @@ class MediaElement(db.Entity, Tagable): @property def ignored_recursive(self) -> bool: - return orm.count(link for link in self.collection_links if link.collection.ignored == True) > 0 + return ( + orm.count( + link + for link in self.collection_links + if link.collection.ignored == True + ) + > 0 + ) @property def ignored_any(self) -> bool: @@ -461,7 +484,7 @@ class MediaElement(db.Entity, Tagable): for link in self.collection_links: if not MediaCollectionLink.get(collection=link.collection, element=other): link.element = other - self.delete() # will also delete still existing uri mappings and collection links + self.delete() # will also delete still existing uri mappings and collection links orm.flush() def add_single_uri(self, uri: str) -> bool: @@ -474,7 +497,9 @@ class MediaElement(db.Entity, Tagable): ) return True if mapping.element != self: - raise Exception(f"URI duplicated for two different media's: {uri}") # TODO may replace with merge call + raise Exception( + f"URI duplicated for two different media's: {uri}" + ) # TODO may replace with merge call return False def add_uris(self, uri_list: Iterable[str]) -> bool: @@ -573,14 +598,17 @@ class MediaCollection(db.Entity, Tagable): return self.last_updated is not None def __to_watch_episodes(self) -> Query | Iterable[MediaCollectionLink]: - return orm.select(link for link in self.media_links if not link.element.skip_over) + return orm.select( + link for link in self.media_links if not link.element.skip_over + ) @property def next_episode(self) -> Optional[MediaCollectionLink]: - return orm \ - .select(link for link in self.media_links if not link.element.skip_over) \ - .order_by(MediaCollectionLink.sort_key) \ + return ( + orm.select(link for link in self.media_links if not link.element.skip_over) + .order_by(MediaCollectionLink.sort_key) .first() + ) @property def completed(self) -> bool: @@ -594,8 +622,12 @@ class MediaCollection(db.Entity, Tagable): def stats(self) -> CollectionStats: return CollectionStats.from_collection(self) - def add_episode(self, media: MediaElement, season: int = 0, episode: int = 0) -> MediaCollectionLink: - link: MediaCollectionLink = MediaCollectionLink.get(collection=self, element=media) + def add_episode( + self, media: MediaElement, season: int = 0, episode: int = 0 + ) -> MediaCollectionLink: + link: MediaCollectionLink = MediaCollectionLink.get( + collection=self, element=media + ) if link is None: link = MediaCollectionLink(collection=self, element=media) link.season, link.episode = season, episode @@ -612,7 +644,9 @@ class MediaCollection(db.Entity, Tagable): ) return True if mapping.element != self: - raise Exception(f"URI duplicated for two different collections's: {uri}") # TODO may replace with merge call + raise Exception( + f"URI duplicated for two different collections's: {uri}" + ) # TODO may replace with merge call return False def add_uris(self, uri_list: Iterable[str]) -> bool: