From 6312143abb8324a6abcb46c9a27175cf0caed56c Mon Sep 17 00:00:00 2001 From: Felix Stupp Date: Fri, 15 Oct 2021 16:19:45 +0200 Subject: [PATCH] Refractor/Extend tag system and add auto recommendation algorithm --- server/entertainment_decider/models.py | 217 +++++++++++++++++++++++-- 1 file changed, 206 insertions(+), 11 deletions(-) diff --git a/server/entertainment_decider/models.py b/server/entertainment_decider/models.py index 99071cd..a552ccc 100644 --- a/server/entertainment_decider/models.py +++ b/server/entertainment_decider/models.py @@ -1,9 +1,11 @@ from __future__ import annotations +import dataclasses from dataclasses import dataclass from datetime import datetime +import math import logging -from typing import Dict, Iterable, List, Optional, Set +from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union from pony import orm @@ -15,6 +17,192 @@ db = orm.Database() #### +@dataclass +class TagRootElement: + base: Tagable + children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: []) + + def share_score(self, points: int) -> PreferenceScoreAppender: + if len(self.children) <= 0: + return PreferenceScoreAppender() + single_share = points / len(self.children) + shares = (child.share_score(single_share) for child in self.children) + return PreferenceScoreAppender(shares) + + +@dataclass +class TagTreeElement: + base: Tag + children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: []) + + def share_score(self, points: int) -> PreferenceScoreAppender: + if len(self.children) <= 0: + return PreferenceScoreAppender(PreferenceScore({self.base: points})) + children_fraction = len(self.children) + base_fraction = children_fraction + 1 + single_share = points / (base_fraction + children_fraction) + base_share = PreferenceScore({self.base: single_share * base_fraction}) + shares = (child.share_score(single_share) for child in self.children) + return base_share & shares + + +TagElement = Union[TagRootElement, TagTreeElement] + + +class Tagable: + + ## abstracted + + @property + def assigned_tags(self) -> Set[Tag]: + """ + Tags which are directly assigned to this object by the user or automatic actions. + """ + return NotImplementedError("") + + @property + def inherited_tags(self) -> Set[Tag]: + """ + Tags, which are inherited by any other means than super/sub-tag relationships. + This relationship does not declare a distance between this tags and assigned tags. + """ + return set() + + @property + def super_tags(self) -> Set[Tag]: + """ + Tags, which are inherited only by super/sub-tag relationships. + This relationship does declare a distance between this tags and assigned tags. + """ + return set() + + ## implemented + + @property + def direct_tags(self) -> Set[Tag]: + return self.assigned_tags | self.inherited_tags + + @property + def __tag_hierachy(self) -> Tuple[TagRootElement, Set[Tag]]: + root = TagRootElement( + base=self, + children=[TagTreeElement(tag) for tag in self.direct_tags], + ) + stack: List[TagTreeElement] = root.children[:] + used: Set[Tag] = self.direct_tags + while len(stack) > 0: + cur = stack.pop(0) + for tag in cur.base.super_tags: + if tag not in used: + elem = TagTreeElement(tag) + cur.children.append(tag) + stack.append(elem) + used.add(tag) + return root, used + + @property + def tag_hierachy(self) -> TagRootElement: + return self.__tag_hierachy[0] + + @property + def all_tags(self) -> Set[Tag]: + return self.__tag_hierachy[1] + + +T = TypeVar("T", bound=Tagable) + + +@dataclass +class PreferenceScore: + points: Dict[Tag, float] = dataclasses.field(default_factory=lambda: {}) + + def __add__(self, other: PreferenceScoreCompatible) -> PreferenceScore: + return (self & other).calculate() + + def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender: + return PreferenceScoreAppender(self, other) + + def __mul__(self, scalar: float) -> PreferenceScore: + return PreferenceScore({tag: score * scalar for tag, score in self.points.items()}) + + def __neg__(self) -> PreferenceScore: + return self * -1 + + def adapt_score(self, tagable: Tagable, score: float) -> PreferenceScore: + return (self & tagable.tag_hierachy.share_score(score)).calculate() + + def calculate_score(self, object: Tagable) -> float: + return math.fsum(self.points[tag] for tag in object.all_tags if tag in self.points) + + def order_by_score(self, objects: Iterable[T]) -> List[T]: + return sorted(objects, key=lambda o: self.calculate_score(o)) + + +class PreferenceScoreAppender: + points_list: List[PreferenceScore] + + def __init__(self, *args: PreferenceScoreCompatible): + self.points_list = [] + for preference in args: + self.__append(preference) + + def __append(self, preference: PreferenceScoreCompatible): + if isinstance(preference, PreferenceScore): + self.points_list.append(preference) + elif isinstance(preference, PreferenceScoreAppender): + self.points_list.extend(preference.points_list) + else: + for sub_pref in preference: + self.__append(sub_pref) + + def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender: + return PreferenceScoreAppender(self, other) + + def calculate(self) -> PreferenceScore: + combined: Dict[Tag, List[float]] = {} + for preference in self.points_list: + for tag, score in preference.points.items(): + if tag not in combined: + combined[tag] = [] + combined[tag].append(score) + return PreferenceScore({tag: math.fsum(scores) for tag, scores in combined.items()}) + + +PreferenceScoreCompatibleSimple = Union[PreferenceScore, PreferenceScoreAppender] +PreferenceScoreCompatible = Union[PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple]] + + +def generate_preference_list( + base: PreferenceScore, + object_gen: Callable[[], List[MediaElement]], + score_adapt: float, + limit: Optional[int] = None +) -> List[MediaElement]: + res_ids = list[int]() + tag_map = dict[MediaCollection, Tag]() + element_list = object_gen() + for element in element_list: + for link in element.collection_links: + if link.collection not in tag_map: + tag = Tag(title="Automatic") + tag.use_for_preferences = True + link.collection.tag_list.add(tag) + tag_map[link.collection] = tag + orm.flush() + while True: + if len(element_list) <= 0: + break + first_element = base.order_by_score(element_list)[0] + res_ids.append(first_element.id) + if limit is not None and limit <= len(res_ids): + break + first_element.watched = True # simulative + base = base.adapt_score(first_element, score_adapt) + element_list = object_gen() + orm.rollback() + return [MediaElement[i] for i in res_ids] + + @dataclass class CollectionStats: @@ -73,7 +261,7 @@ class CollectionStats: ## Tag & Selection Score's -class Tag(db.Entity): +class Tag(db.Entity, Tagable): id: int = orm.PrimaryKey(int, auto=True) @@ -82,9 +270,16 @@ class Tag(db.Entity): use_for_preferences: bool = orm.Required(bool, default=True) + super_tag_list: Iterable[Tag] = orm.Set(lambda: Tag, reverse="sub_tag_list") + sub_tag_list: Iterable[Tag] = orm.Set(lambda: Tag, reverse="super_tag_list") + _collection_list: Iterable[MediaCollection] = orm.Set(lambda: MediaCollection) _media_list: Iterable[MediaElement] = orm.Set(lambda: MediaElement) + @property + def assigned_tags(self) -> Set[Tag]: + return {self} + ## Element <-> Collection Linking @@ -115,7 +310,7 @@ class MediaCollectionLink(db.Entity): ## Media Elements -class MediaElement(db.Entity): +class MediaElement(db.Entity, Tagable): id: int = orm.PrimaryKey(int, auto=True) uri: str = orm.Required(str, unique=True) @@ -168,17 +363,17 @@ class MediaElement(db.Entity): return False return True + @property + def assigned_tags(self) -> Set[Tag]: + return set(self.tag_list) + @property def inherited_tags(self) -> Set[Tag]: result = set() for link in self.collection_links: - result |= link.collection.all_tags + result |= link.collection.direct_tags return result - @property - def all_tags(self) -> Iterable[Tag]: - return set(self.tag_list) | self.inherited_tags - def merge_to(self, other: MediaElement): if self.watched: other.watched = True @@ -229,7 +424,7 @@ class MediaUriMapping(db.Entity): ## Media Collections -class MediaCollection(db.Entity): +class MediaCollection(db.Entity, Tagable): id: int = orm.PrimaryKey(int, auto=True) uri: str = orm.Required(str, unique=True) @@ -271,8 +466,8 @@ class MediaCollection(db.Entity): return self.next_episode is None @property - def all_tags(self) -> Iterable[Tag]: - return self.tag_list + def assigned_tags(self) -> Set[Tag]: + return set(self.tag_list) @property def stats(self) -> CollectionStats: