Reformat models.py

master
Felix Stupp 2 years ago
parent a76eb796ee
commit b7cbcbd540
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -8,7 +8,17 @@ import gzip
import json
import math
import logging
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union
from typing import (
Callable,
Dict,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
import magic
import requests
@ -149,7 +159,9 @@ class PreferenceScore:
return PreferenceScoreAppender(self, other)
def __mul__(self, scalar: float) -> PreferenceScore:
return PreferenceScore({tag: score * scalar for tag, score in self.points.items()})
return PreferenceScore(
{tag: score * scalar for tag, score in self.points.items()}
)
def __neg__(self) -> PreferenceScore:
return self * -1
@ -158,7 +170,9 @@ class PreferenceScore:
return (self & tagable.tag_hierachy.share_score(score)).calculate()
def calculate_score(self, object: Tagable) -> float:
return math.fsum(self.points[tag] for tag in object.all_tags if tag in self.points)
return math.fsum(
self.points[tag] for tag in object.all_tags if tag in self.points
)
def order_by_score(self, objects: Iterable[T]) -> List[T]:
return sorted(objects, key=lambda o: self.calculate_score(o))
@ -169,9 +183,7 @@ class PreferenceScore:
@classmethod
def from_json(cls, data: str) -> PreferenceScore:
dicts: Dict = json.loads(data)
return cls(
{Tag[id]: score for id, score in dicts.items()}
)
return cls({Tag[id]: score for id, score in dicts.items()})
@classmethod
def from_base64(cls, in_data: str, encoding: str = "utf-8") -> PreferenceScore:
@ -183,9 +195,7 @@ class PreferenceScore:
return data
def to_json(self) -> str:
return json.dumps(
{tag.id: score for tag, score in self.points.items()}
)
return json.dumps({tag.id: score for tag, score in self.points.items()})
def to_base64(self, encoding: str = "utf-8") -> str:
data = self.to_json()
@ -226,18 +236,22 @@ class PreferenceScoreAppender:
if tag not in combined:
combined[tag] = []
combined[tag].append(score)
return PreferenceScore({tag: math.fsum(scores) for tag, scores in combined.items()})
return PreferenceScore(
{tag: math.fsum(scores) for tag, scores in combined.items()}
)
PreferenceScoreCompatibleSimple = Union[PreferenceScore, PreferenceScoreAppender]
PreferenceScoreCompatible = Union[PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple]]
PreferenceScoreCompatible = Union[
PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple]
]
def generate_preference_list(
base: PreferenceScore,
object_gen: Callable[[], List[MediaElement]],
score_adapt: float,
limit: Optional[int] = None
limit: Optional[int] = None,
) -> List[MediaElement]:
res_ids = list[int]()
tag_map = dict[MediaCollection, Tag]()
@ -257,7 +271,7 @@ def generate_preference_list(
res_ids.append(first_element.id)
if limit is not None and limit <= len(res_ids):
break
first_element.watched = True # simulative
first_element.watched = True # simulative
base = base.adapt_score(first_element, score_adapt)
element_list = object_gen()
orm.rollback()
@ -268,11 +282,11 @@ def generate_preference_list(
class CollectionStats:
to_watch_count: int
ignored_count: int # but not watched
ignored_count: int # but not watched
watched_count: int
to_watch_seconds: int
ignored_seconds: int # but not watched
ignored_seconds: int # but not watched
watched_seconds: int
@property
@ -403,9 +417,11 @@ class MediaElement(db.Entity, Tagable):
progress: int = orm.Required(int, default=0)
length: int = orm.Optional(int)
tag_list : Iterable[Tag] = orm.Set(lambda: Tag)
tag_list: Iterable[Tag] = orm.Set(lambda: Tag)
uris: Iterable[MediaUriMapping] = orm.Set(lambda: MediaUriMapping)
collection_links: Iterable[MediaCollectionLink] = orm.Set(lambda: MediaCollectionLink)
collection_links: Iterable[MediaCollectionLink] = orm.Set(
lambda: MediaCollectionLink
)
@property
def was_extracted(self) -> bool:
@ -417,7 +433,14 @@ class MediaElement(db.Entity, Tagable):
@property
def ignored_recursive(self) -> bool:
return orm.count(link for link in self.collection_links if link.collection.ignored == True) > 0
return (
orm.count(
link
for link in self.collection_links
if link.collection.ignored == True
)
> 0
)
@property
def ignored_any(self) -> bool:
@ -461,7 +484,7 @@ class MediaElement(db.Entity, Tagable):
for link in self.collection_links:
if not MediaCollectionLink.get(collection=link.collection, element=other):
link.element = other
self.delete() # will also delete still existing uri mappings and collection links
self.delete() # will also delete still existing uri mappings and collection links
orm.flush()
def add_single_uri(self, uri: str) -> bool:
@ -474,7 +497,9 @@ class MediaElement(db.Entity, Tagable):
)
return True
if mapping.element != self:
raise Exception(f"URI duplicated for two different media's: {uri}") # TODO may replace with merge call
raise Exception(
f"URI duplicated for two different media's: {uri}"
) # TODO may replace with merge call
return False
def add_uris(self, uri_list: Iterable[str]) -> bool:
@ -573,14 +598,17 @@ class MediaCollection(db.Entity, Tagable):
return self.last_updated is not None
def __to_watch_episodes(self) -> Query | Iterable[MediaCollectionLink]:
return orm.select(link for link in self.media_links if not link.element.skip_over)
return orm.select(
link for link in self.media_links if not link.element.skip_over
)
@property
def next_episode(self) -> Optional[MediaCollectionLink]:
return orm \
.select(link for link in self.media_links if not link.element.skip_over) \
.order_by(MediaCollectionLink.sort_key) \
return (
orm.select(link for link in self.media_links if not link.element.skip_over)
.order_by(MediaCollectionLink.sort_key)
.first()
)
@property
def completed(self) -> bool:
@ -594,8 +622,12 @@ class MediaCollection(db.Entity, Tagable):
def stats(self) -> CollectionStats:
return CollectionStats.from_collection(self)
def add_episode(self, media: MediaElement, season: int = 0, episode: int = 0) -> MediaCollectionLink:
link: MediaCollectionLink = MediaCollectionLink.get(collection=self, element=media)
def add_episode(
self, media: MediaElement, season: int = 0, episode: int = 0
) -> MediaCollectionLink:
link: MediaCollectionLink = MediaCollectionLink.get(
collection=self, element=media
)
if link is None:
link = MediaCollectionLink(collection=self, element=media)
link.season, link.episode = season, episode
@ -612,7 +644,9 @@ class MediaCollection(db.Entity, Tagable):
)
return True
if mapping.element != self:
raise Exception(f"URI duplicated for two different collections's: {uri}") # TODO may replace with merge call
raise Exception(
f"URI duplicated for two different collections's: {uri}"
) # TODO may replace with merge call
return False
def add_uris(self, uri_list: Iterable[str]) -> bool:

Loading…
Cancel
Save