From 328f234711c28fb62fdb6286f568815df4145df1 Mon Sep 17 00:00:00 2001 From: Felix Stupp Date: Tue, 6 Dec 2022 12:01:07 +0100 Subject: [PATCH] Refractor models.py into 2 submodules --- server/app.py | 3 +- .../entertainment_decider/models/__init__.py | 26 + .../models/custom_types.py | 29 ++ .../{models.py => models/entities.py} | 460 +----------------- .../models/extras/__init__.py | 1 + .../models/extras/uris.py | 70 +++ .../models/thumbnails.py | 21 + .../preferences/__init__.py | 10 + .../preferences/elem_scoring.py | 171 +++++++ .../preferences/tag_scoring.py | 147 ++++++ 10 files changed, 481 insertions(+), 457 deletions(-) create mode 100644 server/entertainment_decider/models/__init__.py create mode 100644 server/entertainment_decider/models/custom_types.py rename server/entertainment_decider/{models.py => models/entities.py} (65%) create mode 100644 server/entertainment_decider/models/extras/__init__.py create mode 100644 server/entertainment_decider/models/extras/uris.py create mode 100644 server/entertainment_decider/models/thumbnails.py create mode 100644 server/entertainment_decider/preferences/__init__.py create mode 100644 server/entertainment_decider/preferences/elem_scoring.py create mode 100644 server/entertainment_decider/preferences/tag_scoring.py diff --git a/server/app.py b/server/app.py index f68cd4d..feac946 100644 --- a/server/app.py +++ b/server/app.py @@ -45,7 +45,6 @@ from pony import orm from entertainment_decider import common from entertainment_decider.models import ( - PreferenceScore, Query, Tag, are_multiple_considered, @@ -53,11 +52,11 @@ from entertainment_decider.models import ( MediaCollection, MediaCollectionLink, MediaElement, - generate_preference_list, get_all_considered, setup_custom_tables, update_element_lookup_cache, ) +from entertainment_decider.preferences import PreferenceScore, generate_preference_list from entertainment_decider.extractors.collection import ( collection_extract_uri, collection_update, diff --git a/server/entertainment_decider/models/__init__.py b/server/entertainment_decider/models/__init__.py new file mode 100644 index 0000000..42ae5c9 --- /dev/null +++ b/server/entertainment_decider/models/__init__.py @@ -0,0 +1,26 @@ +from .custom_types import ( + Query, + SafeStr, +) + +from .entities import ( + CollectionStats, + CollectionUriMapping, + MediaCollection, + MediaCollectionLink, + MediaElement, + MediaThumbnail, + MediaUriMapping, + Tag, + Tagable, + are_multiple_considered, + db, + get_all_considered, + get_all_elements_tags_recursive, + setup_custom_tables, + update_element_lookup_cache, +) + +from .thumbnails import ( + thumbnail_sort_key, +) diff --git a/server/entertainment_decider/models/custom_types.py b/server/entertainment_decider/models/custom_types.py new file mode 100644 index 0000000..b0a6f28 --- /dev/null +++ b/server/entertainment_decider/models/custom_types.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from typing import List, NewType, TypeVar + +from pony.orm.core import Query as PonyQuery + + +SafeStr = NewType("SafeStr", str) +""" +Use this type for strings which are expected to be safe to insert into SQL statements. +They may be included into a SQL statement by quoting them manually: f"SELECT * FROM '{safe_str}'" + +DO NOT CAST STRINGS WHICH MAY BE SET BY USERS TO PREVENT SQL INJECTION ATTACKS. +""" + + +T = TypeVar("T") + + +class Query( + List[T], + PonyQuery, +): + """ + This class may be used to reflect PonyQuerys with all their "kind of" list behavior. + Only use it for type hintings. + """ + + pass diff --git a/server/entertainment_decider/models.py b/server/entertainment_decider/models/entities.py similarity index 65% rename from server/entertainment_decider/models.py rename to server/entertainment_decider/models/entities.py index 9f7f3d7..64679cc 100644 --- a/server/entertainment_decider/models.py +++ b/server/entertainment_decider/models/entities.py @@ -1,195 +1,43 @@ from __future__ import annotations -from abc import abstractmethod, abstractproperty -import base64 -import dataclasses +from abc import abstractproperty from dataclasses import dataclass from datetime import datetime, timedelta from functools import cache -import gzip -from itertools import chain import itertools -import json -import math import logging import re from typing import ( - Callable, - Dict, Iterable, List, Mapping, - NewType, Optional, Sequence, Set, Tuple, - TypeAlias, TypeVar, - Union, ) import magic import requests from pony import orm -from pony.orm.core import Query as PonyQuery -from .common import trim -from .extras import Chain +from .custom_types import Query, SafeStr +from .thumbnails import THUMBNAIL_ALLOWED_TYPES, THUMBNAIL_HEADERS +from .extras import UriHolder +from ..common import trim db = orm.Database() -SafeStr = NewType("SafeStr", str) -""" -Use this type for strings which are expected to be safe to insert into SQL statements. -They may be included into a SQL statement by quoting them manually: f"SELECT * FROM '{safe_str}'" - -DO NOT CAST STRINGS WHICH MAY BE SET BY USERS TO PREVENT SQL INJECTION ATTACKS. -""" - - T = TypeVar("T") -class Query( - List[T], - PonyQuery, -): - """ - This class may be used to reflect PonyQuerys with all their "kind of" list behavior. - Only use it for type hintings. - """ - - pass - - -THUMBNAIL_ALLOWED_TYPES = [ - "image/avif", - "image/jpeg", - "image/png", - "image/webp", -] -THUMBNAIL_HEADERS = { - "Accept": ",".join(THUMBNAIL_ALLOWED_TYPES) + ",*/*;q=0.9", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0", -} -THUMBNAIL_TARGET = 16 / 9 - - -def thumbnail_sort_key(width: int, height: int) -> Tuple: - return ( - abs((width / height) - THUMBNAIL_TARGET), - width * height, - ) - - #### ## Model Extensions #### -class UriHolder: - - ### abstracted - - @abstractproperty - def _primary_uri(self) -> str: - """Returns the primary uri of this object in a naive way.""" - - @abstractmethod - def _set_primary_uri(self, uri: str) -> None: - """Sets the primary uri of this object in a naive way.""" - - @abstractproperty - def _get_uri_set(self) -> Set[str]: - """Returns the uri set of this object in a naive way.""" - - @abstractmethod - def _set_uri_set(self, uri_set: Set[str]) -> None: - """Sets the uri set of this object in a naive way.""" - - @abstractmethod - def _add_uri_to_set(self, uri: str) -> bool: - """Adds a uri to the uri set of this object in a naive way. - - Returns True if the uri was not in the uri set before. - """ - - @abstractmethod - def _remove_uri_from_set(self, uri: str) -> bool: - """Removes a uri to the uri set of this object in a naive way. - - Returns True if the uri was in the uri set before. - """ - - ### implemented - - @property - def primary_uri(self) -> str: - """Returns the current primary uri of this object.""" - return self._primary_uri - - def is_primary_uri(self, compare_uri: str) -> bool: - """Returns True if the given uri is equal to the current primary uri.""" - return self.primary_uri == compare_uri - - def set_primary_uri(self, uri: str) -> bool: - """Sets the current primary of this object. - - It will also add the uri to the uri set. - Returns True if the uri was not in the uri set before. - """ - ret = self._add_uri_to_set(uri) # might fail, so try first - self._set_primary_uri(uri) - return ret - - def set_as_only_uri(self, uri: str) -> None: - self._set_uri_set({uri}) # might fail, so try first - self._set_primary_uri(uri) - - def add_single_uri(self, uri: str) -> bool: - return self._add_uri_to_set(uri) - - def add_uris(self, uri_list: Iterable[Optional[str]]) -> bool: - return any([self.add_single_uri(uri) for uri in set(uri_list) if uri]) - - -@dataclass -class TagRootElement: - base: Tagable - children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: []) - - def share_score(self, points: float) -> PreferenceScoreAppender: - # influences PreferenceScore.max_score_increase - if points == 0 or len(self.children) <= 0: - return PreferenceScoreAppender() - single_share = points / len(self.children) - shares = (child.share_score(single_share) for child in self.children) - return PreferenceScoreAppender(shares) - - -@dataclass -class TagTreeElement: - base: Tag - children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: []) - - def share_score(self, points: float) -> PreferenceScoreAppender: - # influences PreferenceScore.max_score_increase - children = [elem for elem in self.children if elem.base.use_for_preferences] - if len(children) <= 0: - return PreferenceScoreAppender(PreferenceScore({self.base: points})) - children_fraction = len(children) - base_fraction = children_fraction + 1 - single_share = points / (base_fraction + children_fraction) - base_share = PreferenceScore({self.base: single_share * base_fraction}) - shares = (child.share_score(single_share) for child in children) - return base_share & shares - - -TagElement = Union[TagRootElement, TagTreeElement] - - class Tagable: ## abstracted @@ -238,24 +86,6 @@ class Tagable: def direct_tags(self) -> Set[Tag]: return set(self.orm_direct_tags) - @property - def tag_hierachy(self) -> TagRootElement: - root = TagRootElement( - base=self, - children=[TagTreeElement(tag) for tag in self.direct_tags], - ) - stack: List[TagTreeElement] = root.children[:] - used: Set[Tag] = self.direct_tags - while len(stack) > 0: - cur = stack.pop(0) - for tag in cur.base.super_tags: - if tag not in used: - elem = TagTreeElement(tag) - cur.children.append(elem) - stack.append(elem) - used.add(tag) - return root - @property def all_tags(self) -> Set[Tag]: queue: List[Tag] = list(self.direct_tags) @@ -267,286 +97,6 @@ class Tagable: used |= new_tags return used - def share_score_flat(self, score: float) -> PreferenceScoreAppender: - # influences PreferenceScore.max_score_increase - direct_tags = [tag for tag in self.direct_tags if tag.use_for_preferences] - if len(direct_tags) <= 0: - return PreferenceScoreAppender() - return PreferenceScoreAppender( - PreferenceScore({tag: score / len(direct_tags) for tag in direct_tags}) - ) - - -T_tagged = TypeVar("T_tagged", bound=Tagable) - - -@dataclass -class PreferenceScore: - points: Dict[Tag, float] = dataclasses.field(default_factory=lambda: {}) - - def __add__(self, other: PreferenceScoreCompatible) -> PreferenceScore: - return (self & other).calculate() - - def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender: - return PreferenceScoreAppender(self, other) - - def __mul__(self, scalar: float) -> PreferenceScore: - return PreferenceScore( - {tag: score * scalar for tag, score in self.points.items()} - ) - - def __neg__(self) -> PreferenceScore: - return self * -1 - - @staticmethod - def max_score_increase(score: float, adapt_count: int) -> float: - # depends on Tag(Root|Tree)Element.share_score / Tagable.share_score_flat - return score * adapt_count - - def adapt_score( - self, - tagable: Tagable, - score: float, - on_hierachy: bool = True, - ) -> PreferenceScore: - addition = ( - tagable.tag_hierachy.share_score(score) - if on_hierachy - else tagable.share_score_flat(score) - ) - return (self & addition).calculate() - - def calculate_score(self, object: Tagable) -> float: - return self.calculate_iter_score(object.all_tags) - - def calculate_iter_score(self, tag_iter: Iterable[Tag]) -> float: - return math.fsum(self.points.get(tag, 0) for tag in tag_iter) - - @classmethod - def from_json(cls, data: str) -> PreferenceScore: - dicts: Dict = json.loads(data) - return cls({Tag[id]: score for id, score in dicts.items()}) - - @classmethod - def from_base64(cls, in_data: str, encoding: str = "utf-8") -> PreferenceScore: - return ( - Chain(in_data) - | (lambda d: d.encode(encoding=encoding)) - | base64.decodebytes - | gzip.decompress - | (lambda d: d.decode(encoding=encoding)) - | PreferenceScore.from_json - ).get() - - def to_json(self) -> str: - return json.dumps({tag.id: score for tag, score in self.points.items()}) - - def to_base64(self, encoding: str = "utf-8") -> str: - return ( - Chain(self) - | PreferenceScore.to_json - | (lambda d: d.encode(encoding=encoding)) - | ( - lambda d: gzip.compress( - data=d, - compresslevel=9, - ) - ) - | base64.encodebytes - | (lambda d: d.decode(encoding=encoding)) - ).get() - - -class PreferenceScoreAppender: - points_list: List[PreferenceScore] - - def __init__(self, *args: PreferenceScoreCompatible): - self.points_list = [] - for preference in args: - self.__append(preference) - - def __append(self, preference: PreferenceScoreCompatible) -> None: - if isinstance(preference, PreferenceScore): - self.points_list.append(preference) - elif isinstance(preference, PreferenceScoreAppender): - self.points_list.extend(preference.points_list) - else: - for sub_pref in preference: - self.__append(sub_pref) - - def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender: - return PreferenceScoreAppender(self, other) - - def calculate(self) -> PreferenceScore: - combined: Dict[Tag, List[float]] = {} - for preference in self.points_list: - for tag, score in preference.points.items(): - if tag not in combined: - combined[tag] = [] - combined[tag].append(score) - return PreferenceScore( - {tag: math.fsum(scores) for tag, scores in combined.items()} - ) - - -PreferenceScoreCompatibleSimple: TypeAlias = Union[ - PreferenceScore, PreferenceScoreAppender -] -PreferenceScoreCompatible: TypeAlias = Union[ - PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple] -] - -ScoreCalc: TypeAlias = Callable[["MediaElement"], float] - - -def generate_preference_list( - object_gen: Callable[[], List[MediaElement]], - score_adapt: float, - base: Optional[PreferenceScore] = None, - limit: Optional[int] = None, -) -> List[MediaElement]: - element_list = set(object_gen()) - preference = base if base is not None else PreferenceScore() - now = datetime.now() # reference time - - def add_tags_for_collections(): - collections: Iterable[MediaCollection] = MediaCollection.select() - for coll in collections: - coll.tag_list.add( - Tag( - title="Automatic", - use_for_preferences=True, - ) - ) - - def add_tags_for_extractor_names(): - @cache - def get_extractor_tag(extractor_name: str) -> Tag: - return Tag( - title=f"Automatic for extractor {extractor_name}", - use_for_preferences=True, - ) - - for element in element_list: - element.tag_list.add(get_extractor_tag(element.extractor_name)) - - add_tags_for_collections() - add_tags_for_extractor_names() - orm.flush() # flush after custom tags - - # score calc - - elem_tag_map = get_all_elements_tags_recursive() - - def all_tags(element: MediaElement) -> List[Tag]: - return elem_tag_map.get(element.id, []) - - # TODO prepare static score in parallel (or cache it in DB for longer) - @cache - def gen_statis_score(element: MediaElement) -> float: - pinned_collections = orm.count( - link for link in element.collection_links if link.collection.pinned - ) - # reference_date = orm.max((elem_link.element.release_date for coll_link in element.collection_links for elem_link in coll_link.collection.media_links if coll_link.collection.watch_in_order and not elem_link.element.skip_over), default=element.release_date) - # reference_date = max((l.collection.last_release_date_to_watch for l in element.collection_links if l.collection.watch_in_order), default=element.release_date) - reference_date = element.release_date - age_nerf = ( - ( - max(-0.5, math.log((now - reference_date) / timedelta(days=14)) - 1) - if reference_date < now # possible on yet to release media - else -0.5 - ) - # nerf the nerf when pinned or started to prevent hiding - * 0.1 - if (pinned_collections > 0) or element.started - else 1 - ) - # avg_rel = element.average_release_per_week or element.left_length - # avg_rel = element.left_length - all_nerfs = ( - # by id to make sorting consistent - (10**-8) * math.log(element.id + 1000), - # for age of media (newer is better) - age_nerf, - # for average length in relevant collections / length of video itself - # max(0, (math.log(avg_rel + 1) - 5) / 2) if avg_rel else 0 - ) - all_buffs = ( - # for already began to watch - 2 if element.started else 0, - # for count of tags (more is better) - 0.5 * math.log(len(all_tags(element)) + 1), - # for being in pinned collections - 3 * math.log(pinned_collections + 1), - ) - return math.fsum(chain(all_nerfs, (-val for val in all_buffs))) - - def gen_score(element: MediaElement) -> float: - static_score = gen_statis_score(element) - pref_score = preference.calculate_iter_score(all_tags(element)) - return static_score + pref_score - - # pre filter list - # - elements which have a too low current score may never possible appear - # TODO add test that this does not change end result - def pre_filter_list_by_score(elem_list: Set[MediaElement]) -> Set[MediaElement]: - elem_count = len(elem_list) - if limit is None or elem_count <= limit: - return elem_list - # cache pref score for this - gen_pre_score = cache(gen_score) - # biggest possible score increase by adaption - max_score_inc = preference.max_score_increase( - score=score_adapt, - adapt_count=limit, - ) - logging.debug(f"Max adaption possible: {max_score_inc}") - # differenciate adapted buffing and adapted nerfing - without_max_adapt: ScoreCalc = lambda elem: gen_pre_score(elem) - with_max_adapt: ScoreCalc = lambda elem: without_max_adapt(elem) + max_score_inc - is_nerfing = score_adapt >= 0 - if is_nerfing: - best_case = without_max_adapt - worst_case = with_max_adapt - else: # is buffing - best_case = with_max_adapt - worst_case = without_max_adapt - # (limit)ths best's score in the worst adaption for it - limitths_best_worst = sorted(worst_case(elem) for elem in elem_list)[limit] - logging.debug(f"(limit)ths best's worst case score: {limitths_best_worst}") - # extract worst's element's score in best case as well - worsts_best = best_case(max(elem_list, key=gen_pre_score)) - logging.debug(f"Worsts best case score is {worsts_best}") - # check if reducing element count is possible - if limitths_best_worst < worsts_best: - # throw away all elements which's best adaption is not better than the (limit)ths one - ret = {elem for elem in elem_list if best_case(elem) < limitths_best_worst} - logging.debug( - f"Prefilter reduced set from {elem_count} to {len(ret)} elements" - ) - return ret - logging.debug(f"Prefilter couldn't reduce the element count ({elem_count})") - return elem_list - - element_list = pre_filter_list_by_score(element_list) - - # gen elements - res_ids = list[int]() - while 0 < len(element_list): - first_element = min(element_list, key=gen_score) - res_ids.append(first_element.id) - if limit is not None and limit <= len(res_ids): - break - element_list.remove(first_element) - preference = preference.adapt_score(first_element, score_adapt) - - # revert any changes on DB - orm.rollback() - db.execute(f"ALTER TABLE {Tag._table_} AUTO_INCREMENT = 1;") - - # return MediaElements - return [MediaElement[i] for i in res_ids] - @dataclass class CollectionStats: diff --git a/server/entertainment_decider/models/extras/__init__.py b/server/entertainment_decider/models/extras/__init__.py new file mode 100644 index 0000000..09435b7 --- /dev/null +++ b/server/entertainment_decider/models/extras/__init__.py @@ -0,0 +1 @@ +from .uris import UriHolder diff --git a/server/entertainment_decider/models/extras/uris.py b/server/entertainment_decider/models/extras/uris.py new file mode 100644 index 0000000..166813f --- /dev/null +++ b/server/entertainment_decider/models/extras/uris.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from abc import abstractmethod, abstractproperty +from typing import Iterable, Optional, Set + + +class UriHolder: + + ### abstracted + + @abstractproperty + def _primary_uri(self) -> str: + """Returns the primary uri of this object in a naive way.""" + + @abstractmethod + def _set_primary_uri(self, uri: str) -> None: + """Sets the primary uri of this object in a naive way.""" + + @abstractproperty + def _get_uri_set(self) -> Set[str]: + """Returns the uri set of this object in a naive way.""" + + @abstractmethod + def _set_uri_set(self, uri_set: Set[str]) -> None: + """Sets the uri set of this object in a naive way.""" + + @abstractmethod + def _add_uri_to_set(self, uri: str) -> bool: + """Adds a uri to the uri set of this object in a naive way. + + Returns True if the uri was not in the uri set before. + """ + + @abstractmethod + def _remove_uri_from_set(self, uri: str) -> bool: + """Removes a uri to the uri set of this object in a naive way. + + Returns True if the uri was in the uri set before. + """ + + ### implemented + + @property + def primary_uri(self) -> str: + """Returns the current primary uri of this object.""" + return self._primary_uri + + def is_primary_uri(self, compare_uri: str) -> bool: + """Returns True if the given uri is equal to the current primary uri.""" + return self.primary_uri == compare_uri + + def set_primary_uri(self, uri: str) -> bool: + """Sets the current primary of this object. + + It will also add the uri to the uri set. + Returns True if the uri was not in the uri set before. + """ + ret = self._add_uri_to_set(uri) # might fail, so try first + self._set_primary_uri(uri) + return ret + + def set_as_only_uri(self, uri: str) -> None: + self._set_uri_set({uri}) # might fail, so try first + self._set_primary_uri(uri) + + def add_single_uri(self, uri: str) -> bool: + return self._add_uri_to_set(uri) + + def add_uris(self, uri_list: Iterable[Optional[str]]) -> bool: + return any([self.add_single_uri(uri) for uri in set(uri_list) if uri]) diff --git a/server/entertainment_decider/models/thumbnails.py b/server/entertainment_decider/models/thumbnails.py new file mode 100644 index 0000000..e6d7edd --- /dev/null +++ b/server/entertainment_decider/models/thumbnails.py @@ -0,0 +1,21 @@ +from typing import Tuple + + +THUMBNAIL_ALLOWED_TYPES = [ + "image/avif", + "image/jpeg", + "image/png", + "image/webp", +] +THUMBNAIL_HEADERS = { + "Accept": ",".join(THUMBNAIL_ALLOWED_TYPES) + ",*/*;q=0.9", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0", +} +THUMBNAIL_TARGET = 16 / 9 + + +def thumbnail_sort_key(width: int, height: int) -> Tuple: + return ( + abs((width / height) - THUMBNAIL_TARGET), + width * height, + ) diff --git a/server/entertainment_decider/preferences/__init__.py b/server/entertainment_decider/preferences/__init__.py new file mode 100644 index 0000000..e094d79 --- /dev/null +++ b/server/entertainment_decider/preferences/__init__.py @@ -0,0 +1,10 @@ +from .elem_scoring import ( + generate_preference_list, +) + +from .tag_scoring import ( + PreferenceScore, + PreferenceScoreAppender, + PreferenceScoreCompatible, + PreferenceScoreSuper, +) diff --git a/server/entertainment_decider/preferences/elem_scoring.py b/server/entertainment_decider/preferences/elem_scoring.py new file mode 100644 index 0000000..a963e9d --- /dev/null +++ b/server/entertainment_decider/preferences/elem_scoring.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from functools import cache +from itertools import chain +import logging +import math +from typing import Callable, Iterable, List, Optional, Sequence, Set, TypeAlias + +from pony import orm + +from .tag_scoring import PreferenceScore +from ..models import ( + MediaCollection, + MediaElement, + Tag, + db, + get_all_elements_tags_recursive, +) + + +ScoreCalc: TypeAlias = Callable[["MediaElement"], float] + + +def generate_preference_list( + object_gen: Callable[[], List[MediaElement]], + score_adapt: float, + base: Optional[PreferenceScore] = None, + limit: Optional[int] = None, +) -> List[MediaElement]: + element_list: Set[MediaElement] = set(object_gen()) + preference = base if base is not None else PreferenceScore() + now = datetime.now() # reference time + + def add_tags_for_collections() -> None: + collections: Iterable[MediaCollection] = MediaCollection.select() + for coll in collections: + coll.tag_list.add( + Tag( + title="Automatic", + use_for_preferences=True, + ) + ) + + def add_tags_for_extractor_names() -> None: + @cache + def get_extractor_tag(extractor_name: str) -> Tag: + return Tag( + title=f"Automatic for extractor {extractor_name}", + use_for_preferences=True, + ) + + for element in element_list: + element.tag_list.add(get_extractor_tag(element.extractor_name)) + + add_tags_for_collections() + add_tags_for_extractor_names() + orm.flush() # flush after custom tags + + # score calc + + elem_tag_map = get_all_elements_tags_recursive() + + def all_tags(element: MediaElement) -> Sequence[Tag]: + return elem_tag_map.get(element.id, []) + + # TODO prepare static score in parallel (or cache it in DB for longer) + @cache + def gen_statis_score(element: MediaElement) -> float: + pinned_collections = orm.count( + link for link in element.collection_links if link.collection.pinned + ) + # reference_date = orm.max((elem_link.element.release_date for coll_link in element.collection_links for elem_link in coll_link.collection.media_links if coll_link.collection.watch_in_order and not elem_link.element.skip_over), default=element.release_date) + # reference_date = max((l.collection.last_release_date_to_watch for l in element.collection_links if l.collection.watch_in_order), default=element.release_date) + reference_date = element.release_date + age_nerf = ( + ( + max(-0.5, math.log((now - reference_date) / timedelta(days=14)) - 1) + if reference_date < now # possible on yet to release media + else -0.5 + ) + # nerf the nerf when pinned or started to prevent hiding + * 0.1 + if (pinned_collections > 0) or element.started + else 1 + ) + # avg_rel = element.average_release_per_week or element.left_length + # avg_rel = element.left_length + all_nerfs = ( + # by id to make sorting consistent + (10**-8) * math.log(element.id + 1000), + # for age of media (newer is better) + age_nerf, + # for average length in relevant collections / length of video itself + # max(0, (math.log(avg_rel + 1) - 5) / 2) if avg_rel else 0 + ) + all_buffs = ( + # for already began to watch + 2 if element.started else 0, + # for count of tags (more is better) + 0.5 * math.log(len(all_tags(element)) + 1), + # for being in pinned collections + 3 * math.log(pinned_collections + 1), + ) + return math.fsum(chain(all_nerfs, (-val for val in all_buffs))) + + def gen_score(element: MediaElement) -> float: + static_score = gen_statis_score(element) + pref_score = preference.calculate_iter_score(all_tags(element)) + return static_score + pref_score + + # pre filter list + # - elements which have a too low current score may never possible appear + # TODO add test that this does not change end result + def pre_filter_list_by_score(elem_list: Set[MediaElement]) -> Set[MediaElement]: + elem_count = len(elem_list) + if limit is None or elem_count <= limit: + return elem_list + # cache pref score for this + gen_pre_score = cache(gen_score) + # biggest possible score increase by adaption + max_score_inc = preference.max_score_increase( + score=score_adapt, + adapt_count=limit, + ) + logging.debug(f"Max adaption possible: {max_score_inc}") + # differenciate adapted buffing and adapted nerfing + without_max_adapt: ScoreCalc = lambda elem: gen_pre_score(elem) + with_max_adapt: ScoreCalc = lambda elem: without_max_adapt(elem) + max_score_inc + is_nerfing = score_adapt >= 0 + if is_nerfing: + best_case = without_max_adapt + worst_case = with_max_adapt + else: # is buffing + best_case = with_max_adapt + worst_case = without_max_adapt + # (limit)ths best's score in the worst adaption for it + limitths_best_worst = sorted(worst_case(elem) for elem in elem_list)[limit] + logging.debug(f"(limit)ths best's worst case score: {limitths_best_worst}") + # extract worst's element's score in best case as well + worsts_best = best_case(max(elem_list, key=gen_pre_score)) + logging.debug(f"Worsts best case score is {worsts_best}") + # check if reducing element count is possible + if limitths_best_worst < worsts_best: + # throw away all elements which's best adaption is not better than the (limit)ths one + ret = {elem for elem in elem_list if best_case(elem) < limitths_best_worst} + logging.debug( + f"Prefilter reduced set from {elem_count} to {len(ret)} elements" + ) + return ret + logging.debug(f"Prefilter couldn't reduce the element count ({elem_count})") + return elem_list + + element_list = pre_filter_list_by_score(element_list) + + # gen elements + res_ids = list[int]() + while 0 < len(element_list): + first_element = min(element_list, key=gen_score) + res_ids.append(first_element.id) + if limit is not None and limit <= len(res_ids): + break + element_list.remove(first_element) + preference = preference.adapt_score(first_element, score_adapt) + + # revert any changes on DB + orm.rollback() + db.execute(f"ALTER TABLE {Tag._table_} AUTO_INCREMENT = 1;") + + # return MediaElements + return [MediaElement[i] for i in res_ids] diff --git a/server/entertainment_decider/preferences/tag_scoring.py b/server/entertainment_decider/preferences/tag_scoring.py new file mode 100644 index 0000000..c9d7a23 --- /dev/null +++ b/server/entertainment_decider/preferences/tag_scoring.py @@ -0,0 +1,147 @@ +from __future__ import annotations + +import base64 +import dataclasses +from dataclasses import dataclass +import gzip +import json +import math +from typing import Dict, Iterable, List, TypeAlias, Union + +from ..extras import Chain +from ..models import Tag, Tagable + + +@dataclass +class PreferenceScore: + points: Dict[Tag, float] = dataclasses.field(default_factory=lambda: {}) + + def __add__(self, other: PreferenceScoreCompatible) -> PreferenceScore: + return (self & other).calculate() + + def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender: + return PreferenceScoreAppender(self, other) + + def __mul__(self, scalar: float) -> PreferenceScore: + return PreferenceScore( + {tag: score * scalar for tag, score in self.points.items()} + ) + + def __neg__(self) -> PreferenceScore: + return self * -1 + + @staticmethod + def max_score_increase(score: float, adapt_count: int) -> float: + # depends on PreferenceScoreAppender.share_score(_flat) + return score * adapt_count + + def adapt_score( + self, + tagable: Tagable, + score: float, + on_hierachy: bool = True, + ) -> PreferenceScore: + addition = ( + PreferenceScoreAppender.share_score + if on_hierachy + else PreferenceScoreAppender.share_score_flat + )(tagable, score) + return (self & addition).calculate() + + def calculate_score(self, object: Tagable) -> float: + return self.calculate_iter_score(object.all_tags) + + def calculate_iter_score(self, tag_iter: Iterable[Tag]) -> float: + return math.fsum(self.points.get(tag, 0) for tag in tag_iter) + + @classmethod + def from_json(cls, data: str) -> PreferenceScore: + dicts: Dict = json.loads(data) + return cls({Tag[id]: score for id, score in dicts.items()}) + + @classmethod + def from_base64(cls, in_data: str, encoding: str = "utf-8") -> PreferenceScore: + return ( + Chain(in_data) + | (lambda d: d.encode(encoding=encoding)) + | base64.decodebytes + | gzip.decompress + | (lambda d: d.decode(encoding=encoding)) + | PreferenceScore.from_json + ).get() + + def to_json(self) -> str: + return json.dumps({tag.id: score for tag, score in self.points.items()}) + + def to_base64(self, encoding: str = "utf-8") -> str: + return ( + Chain(self) + | PreferenceScore.to_json + | (lambda d: d.encode(encoding=encoding)) + | ( + lambda d: gzip.compress( + data=d, + compresslevel=9, + ) + ) + | base64.encodebytes + | (lambda d: d.decode(encoding=encoding)) + ).get() + + +class PreferenceScoreAppender: + points_list: List[PreferenceScore] + + @staticmethod + def share_score_flat(obj: Tagable, score: float) -> PreferenceScoreSuper: + # influences PreferenceScore.max_score_increase + direct_tags = [tag for tag in obj.direct_tags if tag.use_for_preferences] + if len(direct_tags) <= 0: + return PreferenceScoreAppender() + return PreferenceScore({tag: score / len(direct_tags) for tag in direct_tags}) + + @classmethod + def share_score(cls, obj: Tagable, score: float) -> PreferenceScoreSuper: + # influences PreferenceScore.max_score_increase + super_tags = [tag for tag in obj.super_tags if tag.use_for_preferences] + super_fraction = len(super_tags) + direct_fraction = super_fraction + 1 + full_fraction = super_fraction + direct_fraction + single_share = score / full_fraction + direct_share = cls.share_score_flat(obj, single_share * direct_fraction) + super_shares = (super_tag.share_score(single_share) for super_tag in super_tags) + return direct_share & super_shares + + def __init__(self, *args: PreferenceScoreCompatible): + self.points_list = [] + for preference in args: + self.__append(preference) + + def __append(self, preference: PreferenceScoreCompatible) -> None: + if isinstance(preference, PreferenceScore): + self.points_list.append(preference) + elif isinstance(preference, PreferenceScoreAppender): + self.points_list.extend(preference.points_list) + else: + for sub_pref in preference: + self.__append(sub_pref) + + def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender: + return PreferenceScoreAppender(self, other) + + def calculate(self) -> PreferenceScore: + combined: Dict[Tag, List[float]] = {} + for preference in self.points_list: + for tag, score in preference.points.items(): + if tag not in combined: + combined[tag] = [] + combined[tag].append(score) + return PreferenceScore( + {tag: math.fsum(scores) for tag, scores in combined.items()} + ) + + +PreferenceScoreSuper: TypeAlias = Union[PreferenceScore, PreferenceScoreAppender] +PreferenceScoreCompatible: TypeAlias = Union[ + PreferenceScoreSuper, Iterable[PreferenceScoreSuper] +]