Refractor/Extend tag system and add auto recommendation algorithm

master
Felix Stupp 3 years ago
parent 6330443876
commit 6312143abb
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -1,9 +1,11 @@
from __future__ import annotations
import dataclasses
from dataclasses import dataclass
from datetime import datetime
import math
import logging
from typing import Dict, Iterable, List, Optional, Set
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar, Union
from pony import orm
@ -15,6 +17,192 @@ db = orm.Database()
####
@dataclass
class TagRootElement:
base: Tagable
children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: [])
def share_score(self, points: int) -> PreferenceScoreAppender:
if len(self.children) <= 0:
return PreferenceScoreAppender()
single_share = points / len(self.children)
shares = (child.share_score(single_share) for child in self.children)
return PreferenceScoreAppender(shares)
@dataclass
class TagTreeElement:
base: Tag
children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: [])
def share_score(self, points: int) -> PreferenceScoreAppender:
if len(self.children) <= 0:
return PreferenceScoreAppender(PreferenceScore({self.base: points}))
children_fraction = len(self.children)
base_fraction = children_fraction + 1
single_share = points / (base_fraction + children_fraction)
base_share = PreferenceScore({self.base: single_share * base_fraction})
shares = (child.share_score(single_share) for child in self.children)
return base_share & shares
TagElement = Union[TagRootElement, TagTreeElement]
class Tagable:
## abstracted
@property
def assigned_tags(self) -> Set[Tag]:
"""
Tags which are directly assigned to this object by the user or automatic actions.
"""
return NotImplementedError("")
@property
def inherited_tags(self) -> Set[Tag]:
"""
Tags, which are inherited by any other means than super/sub-tag relationships.
This relationship does not declare a distance between this tags and assigned tags.
"""
return set()
@property
def super_tags(self) -> Set[Tag]:
"""
Tags, which are inherited only by super/sub-tag relationships.
This relationship does declare a distance between this tags and assigned tags.
"""
return set()
## implemented
@property
def direct_tags(self) -> Set[Tag]:
return self.assigned_tags | self.inherited_tags
@property
def __tag_hierachy(self) -> Tuple[TagRootElement, Set[Tag]]:
root = TagRootElement(
base=self,
children=[TagTreeElement(tag) for tag in self.direct_tags],
)
stack: List[TagTreeElement] = root.children[:]
used: Set[Tag] = self.direct_tags
while len(stack) > 0:
cur = stack.pop(0)
for tag in cur.base.super_tags:
if tag not in used:
elem = TagTreeElement(tag)
cur.children.append(tag)
stack.append(elem)
used.add(tag)
return root, used
@property
def tag_hierachy(self) -> TagRootElement:
return self.__tag_hierachy[0]
@property
def all_tags(self) -> Set[Tag]:
return self.__tag_hierachy[1]
T = TypeVar("T", bound=Tagable)
@dataclass
class PreferenceScore:
points: Dict[Tag, float] = dataclasses.field(default_factory=lambda: {})
def __add__(self, other: PreferenceScoreCompatible) -> PreferenceScore:
return (self & other).calculate()
def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender:
return PreferenceScoreAppender(self, other)
def __mul__(self, scalar: float) -> PreferenceScore:
return PreferenceScore({tag: score * scalar for tag, score in self.points.items()})
def __neg__(self) -> PreferenceScore:
return self * -1
def adapt_score(self, tagable: Tagable, score: float) -> PreferenceScore:
return (self & tagable.tag_hierachy.share_score(score)).calculate()
def calculate_score(self, object: Tagable) -> float:
return math.fsum(self.points[tag] for tag in object.all_tags if tag in self.points)
def order_by_score(self, objects: Iterable[T]) -> List[T]:
return sorted(objects, key=lambda o: self.calculate_score(o))
class PreferenceScoreAppender:
points_list: List[PreferenceScore]
def __init__(self, *args: PreferenceScoreCompatible):
self.points_list = []
for preference in args:
self.__append(preference)
def __append(self, preference: PreferenceScoreCompatible):
if isinstance(preference, PreferenceScore):
self.points_list.append(preference)
elif isinstance(preference, PreferenceScoreAppender):
self.points_list.extend(preference.points_list)
else:
for sub_pref in preference:
self.__append(sub_pref)
def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender:
return PreferenceScoreAppender(self, other)
def calculate(self) -> PreferenceScore:
combined: Dict[Tag, List[float]] = {}
for preference in self.points_list:
for tag, score in preference.points.items():
if tag not in combined:
combined[tag] = []
combined[tag].append(score)
return PreferenceScore({tag: math.fsum(scores) for tag, scores in combined.items()})
PreferenceScoreCompatibleSimple = Union[PreferenceScore, PreferenceScoreAppender]
PreferenceScoreCompatible = Union[PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple]]
def generate_preference_list(
base: PreferenceScore,
object_gen: Callable[[], List[MediaElement]],
score_adapt: float,
limit: Optional[int] = None
) -> List[MediaElement]:
res_ids = list[int]()
tag_map = dict[MediaCollection, Tag]()
element_list = object_gen()
for element in element_list:
for link in element.collection_links:
if link.collection not in tag_map:
tag = Tag(title="Automatic")
tag.use_for_preferences = True
link.collection.tag_list.add(tag)
tag_map[link.collection] = tag
orm.flush()
while True:
if len(element_list) <= 0:
break
first_element = base.order_by_score(element_list)[0]
res_ids.append(first_element.id)
if limit is not None and limit <= len(res_ids):
break
first_element.watched = True # simulative
base = base.adapt_score(first_element, score_adapt)
element_list = object_gen()
orm.rollback()
return [MediaElement[i] for i in res_ids]
@dataclass
class CollectionStats:
@ -73,7 +261,7 @@ class CollectionStats:
## Tag & Selection Score's
class Tag(db.Entity):
class Tag(db.Entity, Tagable):
id: int = orm.PrimaryKey(int, auto=True)
@ -82,9 +270,16 @@ class Tag(db.Entity):
use_for_preferences: bool = orm.Required(bool, default=True)
super_tag_list: Iterable[Tag] = orm.Set(lambda: Tag, reverse="sub_tag_list")
sub_tag_list: Iterable[Tag] = orm.Set(lambda: Tag, reverse="super_tag_list")
_collection_list: Iterable[MediaCollection] = orm.Set(lambda: MediaCollection)
_media_list: Iterable[MediaElement] = orm.Set(lambda: MediaElement)
@property
def assigned_tags(self) -> Set[Tag]:
return {self}
## Element <-> Collection Linking
@ -115,7 +310,7 @@ class MediaCollectionLink(db.Entity):
## Media Elements
class MediaElement(db.Entity):
class MediaElement(db.Entity, Tagable):
id: int = orm.PrimaryKey(int, auto=True)
uri: str = orm.Required(str, unique=True)
@ -168,17 +363,17 @@ class MediaElement(db.Entity):
return False
return True
@property
def assigned_tags(self) -> Set[Tag]:
return set(self.tag_list)
@property
def inherited_tags(self) -> Set[Tag]:
result = set()
for link in self.collection_links:
result |= link.collection.all_tags
result |= link.collection.direct_tags
return result
@property
def all_tags(self) -> Iterable[Tag]:
return set(self.tag_list) | self.inherited_tags
def merge_to(self, other: MediaElement):
if self.watched:
other.watched = True
@ -229,7 +424,7 @@ class MediaUriMapping(db.Entity):
## Media Collections
class MediaCollection(db.Entity):
class MediaCollection(db.Entity, Tagable):
id: int = orm.PrimaryKey(int, auto=True)
uri: str = orm.Required(str, unique=True)
@ -271,8 +466,8 @@ class MediaCollection(db.Entity):
return self.next_episode is None
@property
def all_tags(self) -> Iterable[Tag]:
return self.tag_list
def assigned_tags(self) -> Set[Tag]:
return set(self.tag_list)
@property
def stats(self) -> CollectionStats:

Loading…
Cancel
Save