Refractor models.py into 2 submodules

master
Felix Stupp 1 year ago
parent 9fabb39f86
commit 328f234711
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -45,7 +45,6 @@ from pony import orm
from entertainment_decider import common
from entertainment_decider.models import (
PreferenceScore,
Query,
Tag,
are_multiple_considered,
@ -53,11 +52,11 @@ from entertainment_decider.models import (
MediaCollection,
MediaCollectionLink,
MediaElement,
generate_preference_list,
get_all_considered,
setup_custom_tables,
update_element_lookup_cache,
)
from entertainment_decider.preferences import PreferenceScore, generate_preference_list
from entertainment_decider.extractors.collection import (
collection_extract_uri,
collection_update,

@ -0,0 +1,26 @@
from .custom_types import (
Query,
SafeStr,
)
from .entities import (
CollectionStats,
CollectionUriMapping,
MediaCollection,
MediaCollectionLink,
MediaElement,
MediaThumbnail,
MediaUriMapping,
Tag,
Tagable,
are_multiple_considered,
db,
get_all_considered,
get_all_elements_tags_recursive,
setup_custom_tables,
update_element_lookup_cache,
)
from .thumbnails import (
thumbnail_sort_key,
)

@ -0,0 +1,29 @@
from __future__ import annotations
from typing import List, NewType, TypeVar
from pony.orm.core import Query as PonyQuery
SafeStr = NewType("SafeStr", str)
"""
Use this type for strings which are expected to be safe to insert into SQL statements.
They may be included into a SQL statement by quoting them manually: f"SELECT * FROM '{safe_str}'"
DO NOT CAST STRINGS WHICH MAY BE SET BY USERS TO PREVENT SQL INJECTION ATTACKS.
"""
T = TypeVar("T")
class Query(
List[T],
PonyQuery,
):
"""
This class may be used to reflect PonyQuerys with all their "kind of" list behavior.
Only use it for type hintings.
"""
pass

@ -1,195 +1,43 @@
from __future__ import annotations
from abc import abstractmethod, abstractproperty
import base64
import dataclasses
from abc import abstractproperty
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import cache
import gzip
from itertools import chain
import itertools
import json
import math
import logging
import re
from typing import (
Callable,
Dict,
Iterable,
List,
Mapping,
NewType,
Optional,
Sequence,
Set,
Tuple,
TypeAlias,
TypeVar,
Union,
)
import magic
import requests
from pony import orm
from pony.orm.core import Query as PonyQuery
from .common import trim
from .extras import Chain
from .custom_types import Query, SafeStr
from .thumbnails import THUMBNAIL_ALLOWED_TYPES, THUMBNAIL_HEADERS
from .extras import UriHolder
from ..common import trim
db = orm.Database()
SafeStr = NewType("SafeStr", str)
"""
Use this type for strings which are expected to be safe to insert into SQL statements.
They may be included into a SQL statement by quoting them manually: f"SELECT * FROM '{safe_str}'"
DO NOT CAST STRINGS WHICH MAY BE SET BY USERS TO PREVENT SQL INJECTION ATTACKS.
"""
T = TypeVar("T")
class Query(
List[T],
PonyQuery,
):
"""
This class may be used to reflect PonyQuerys with all their "kind of" list behavior.
Only use it for type hintings.
"""
pass
THUMBNAIL_ALLOWED_TYPES = [
"image/avif",
"image/jpeg",
"image/png",
"image/webp",
]
THUMBNAIL_HEADERS = {
"Accept": ",".join(THUMBNAIL_ALLOWED_TYPES) + ",*/*;q=0.9",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0",
}
THUMBNAIL_TARGET = 16 / 9
def thumbnail_sort_key(width: int, height: int) -> Tuple:
return (
abs((width / height) - THUMBNAIL_TARGET),
width * height,
)
####
## Model Extensions
####
class UriHolder:
### abstracted
@abstractproperty
def _primary_uri(self) -> str:
"""Returns the primary uri of this object in a naive way."""
@abstractmethod
def _set_primary_uri(self, uri: str) -> None:
"""Sets the primary uri of this object in a naive way."""
@abstractproperty
def _get_uri_set(self) -> Set[str]:
"""Returns the uri set of this object in a naive way."""
@abstractmethod
def _set_uri_set(self, uri_set: Set[str]) -> None:
"""Sets the uri set of this object in a naive way."""
@abstractmethod
def _add_uri_to_set(self, uri: str) -> bool:
"""Adds a uri to the uri set of this object in a naive way.
Returns True if the uri was not in the uri set before.
"""
@abstractmethod
def _remove_uri_from_set(self, uri: str) -> bool:
"""Removes a uri to the uri set of this object in a naive way.
Returns True if the uri was in the uri set before.
"""
### implemented
@property
def primary_uri(self) -> str:
"""Returns the current primary uri of this object."""
return self._primary_uri
def is_primary_uri(self, compare_uri: str) -> bool:
"""Returns True if the given uri is equal to the current primary uri."""
return self.primary_uri == compare_uri
def set_primary_uri(self, uri: str) -> bool:
"""Sets the current primary of this object.
It will also add the uri to the uri set.
Returns True if the uri was not in the uri set before.
"""
ret = self._add_uri_to_set(uri) # might fail, so try first
self._set_primary_uri(uri)
return ret
def set_as_only_uri(self, uri: str) -> None:
self._set_uri_set({uri}) # might fail, so try first
self._set_primary_uri(uri)
def add_single_uri(self, uri: str) -> bool:
return self._add_uri_to_set(uri)
def add_uris(self, uri_list: Iterable[Optional[str]]) -> bool:
return any([self.add_single_uri(uri) for uri in set(uri_list) if uri])
@dataclass
class TagRootElement:
base: Tagable
children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: [])
def share_score(self, points: float) -> PreferenceScoreAppender:
# influences PreferenceScore.max_score_increase
if points == 0 or len(self.children) <= 0:
return PreferenceScoreAppender()
single_share = points / len(self.children)
shares = (child.share_score(single_share) for child in self.children)
return PreferenceScoreAppender(shares)
@dataclass
class TagTreeElement:
base: Tag
children: List[TagTreeElement] = dataclasses.field(default_factory=lambda: [])
def share_score(self, points: float) -> PreferenceScoreAppender:
# influences PreferenceScore.max_score_increase
children = [elem for elem in self.children if elem.base.use_for_preferences]
if len(children) <= 0:
return PreferenceScoreAppender(PreferenceScore({self.base: points}))
children_fraction = len(children)
base_fraction = children_fraction + 1
single_share = points / (base_fraction + children_fraction)
base_share = PreferenceScore({self.base: single_share * base_fraction})
shares = (child.share_score(single_share) for child in children)
return base_share & shares
TagElement = Union[TagRootElement, TagTreeElement]
class Tagable:
## abstracted
@ -238,24 +86,6 @@ class Tagable:
def direct_tags(self) -> Set[Tag]:
return set(self.orm_direct_tags)
@property
def tag_hierachy(self) -> TagRootElement:
root = TagRootElement(
base=self,
children=[TagTreeElement(tag) for tag in self.direct_tags],
)
stack: List[TagTreeElement] = root.children[:]
used: Set[Tag] = self.direct_tags
while len(stack) > 0:
cur = stack.pop(0)
for tag in cur.base.super_tags:
if tag not in used:
elem = TagTreeElement(tag)
cur.children.append(elem)
stack.append(elem)
used.add(tag)
return root
@property
def all_tags(self) -> Set[Tag]:
queue: List[Tag] = list(self.direct_tags)
@ -267,286 +97,6 @@ class Tagable:
used |= new_tags
return used
def share_score_flat(self, score: float) -> PreferenceScoreAppender:
# influences PreferenceScore.max_score_increase
direct_tags = [tag for tag in self.direct_tags if tag.use_for_preferences]
if len(direct_tags) <= 0:
return PreferenceScoreAppender()
return PreferenceScoreAppender(
PreferenceScore({tag: score / len(direct_tags) for tag in direct_tags})
)
T_tagged = TypeVar("T_tagged", bound=Tagable)
@dataclass
class PreferenceScore:
points: Dict[Tag, float] = dataclasses.field(default_factory=lambda: {})
def __add__(self, other: PreferenceScoreCompatible) -> PreferenceScore:
return (self & other).calculate()
def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender:
return PreferenceScoreAppender(self, other)
def __mul__(self, scalar: float) -> PreferenceScore:
return PreferenceScore(
{tag: score * scalar for tag, score in self.points.items()}
)
def __neg__(self) -> PreferenceScore:
return self * -1
@staticmethod
def max_score_increase(score: float, adapt_count: int) -> float:
# depends on Tag(Root|Tree)Element.share_score / Tagable.share_score_flat
return score * adapt_count
def adapt_score(
self,
tagable: Tagable,
score: float,
on_hierachy: bool = True,
) -> PreferenceScore:
addition = (
tagable.tag_hierachy.share_score(score)
if on_hierachy
else tagable.share_score_flat(score)
)
return (self & addition).calculate()
def calculate_score(self, object: Tagable) -> float:
return self.calculate_iter_score(object.all_tags)
def calculate_iter_score(self, tag_iter: Iterable[Tag]) -> float:
return math.fsum(self.points.get(tag, 0) for tag in tag_iter)
@classmethod
def from_json(cls, data: str) -> PreferenceScore:
dicts: Dict = json.loads(data)
return cls({Tag[id]: score for id, score in dicts.items()})
@classmethod
def from_base64(cls, in_data: str, encoding: str = "utf-8") -> PreferenceScore:
return (
Chain(in_data)
| (lambda d: d.encode(encoding=encoding))
| base64.decodebytes
| gzip.decompress
| (lambda d: d.decode(encoding=encoding))
| PreferenceScore.from_json
).get()
def to_json(self) -> str:
return json.dumps({tag.id: score for tag, score in self.points.items()})
def to_base64(self, encoding: str = "utf-8") -> str:
return (
Chain(self)
| PreferenceScore.to_json
| (lambda d: d.encode(encoding=encoding))
| (
lambda d: gzip.compress(
data=d,
compresslevel=9,
)
)
| base64.encodebytes
| (lambda d: d.decode(encoding=encoding))
).get()
class PreferenceScoreAppender:
points_list: List[PreferenceScore]
def __init__(self, *args: PreferenceScoreCompatible):
self.points_list = []
for preference in args:
self.__append(preference)
def __append(self, preference: PreferenceScoreCompatible) -> None:
if isinstance(preference, PreferenceScore):
self.points_list.append(preference)
elif isinstance(preference, PreferenceScoreAppender):
self.points_list.extend(preference.points_list)
else:
for sub_pref in preference:
self.__append(sub_pref)
def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender:
return PreferenceScoreAppender(self, other)
def calculate(self) -> PreferenceScore:
combined: Dict[Tag, List[float]] = {}
for preference in self.points_list:
for tag, score in preference.points.items():
if tag not in combined:
combined[tag] = []
combined[tag].append(score)
return PreferenceScore(
{tag: math.fsum(scores) for tag, scores in combined.items()}
)
PreferenceScoreCompatibleSimple: TypeAlias = Union[
PreferenceScore, PreferenceScoreAppender
]
PreferenceScoreCompatible: TypeAlias = Union[
PreferenceScoreCompatibleSimple, Iterable[PreferenceScoreCompatibleSimple]
]
ScoreCalc: TypeAlias = Callable[["MediaElement"], float]
def generate_preference_list(
object_gen: Callable[[], List[MediaElement]],
score_adapt: float,
base: Optional[PreferenceScore] = None,
limit: Optional[int] = None,
) -> List[MediaElement]:
element_list = set(object_gen())
preference = base if base is not None else PreferenceScore()
now = datetime.now() # reference time
def add_tags_for_collections():
collections: Iterable[MediaCollection] = MediaCollection.select()
for coll in collections:
coll.tag_list.add(
Tag(
title="Automatic",
use_for_preferences=True,
)
)
def add_tags_for_extractor_names():
@cache
def get_extractor_tag(extractor_name: str) -> Tag:
return Tag(
title=f"Automatic for extractor {extractor_name}",
use_for_preferences=True,
)
for element in element_list:
element.tag_list.add(get_extractor_tag(element.extractor_name))
add_tags_for_collections()
add_tags_for_extractor_names()
orm.flush() # flush after custom tags
# score calc
elem_tag_map = get_all_elements_tags_recursive()
def all_tags(element: MediaElement) -> List[Tag]:
return elem_tag_map.get(element.id, [])
# TODO prepare static score in parallel (or cache it in DB for longer)
@cache
def gen_statis_score(element: MediaElement) -> float:
pinned_collections = orm.count(
link for link in element.collection_links if link.collection.pinned
)
# reference_date = orm.max((elem_link.element.release_date for coll_link in element.collection_links for elem_link in coll_link.collection.media_links if coll_link.collection.watch_in_order and not elem_link.element.skip_over), default=element.release_date)
# reference_date = max((l.collection.last_release_date_to_watch for l in element.collection_links if l.collection.watch_in_order), default=element.release_date)
reference_date = element.release_date
age_nerf = (
(
max(-0.5, math.log((now - reference_date) / timedelta(days=14)) - 1)
if reference_date < now # possible on yet to release media
else -0.5
)
# nerf the nerf when pinned or started to prevent hiding
* 0.1
if (pinned_collections > 0) or element.started
else 1
)
# avg_rel = element.average_release_per_week or element.left_length
# avg_rel = element.left_length
all_nerfs = (
# by id to make sorting consistent
(10**-8) * math.log(element.id + 1000),
# for age of media (newer is better)
age_nerf,
# for average length in relevant collections / length of video itself
# max(0, (math.log(avg_rel + 1) - 5) / 2) if avg_rel else 0
)
all_buffs = (
# for already began to watch
2 if element.started else 0,
# for count of tags (more is better)
0.5 * math.log(len(all_tags(element)) + 1),
# for being in pinned collections
3 * math.log(pinned_collections + 1),
)
return math.fsum(chain(all_nerfs, (-val for val in all_buffs)))
def gen_score(element: MediaElement) -> float:
static_score = gen_statis_score(element)
pref_score = preference.calculate_iter_score(all_tags(element))
return static_score + pref_score
# pre filter list
# - elements which have a too low current score may never possible appear
# TODO add test that this does not change end result
def pre_filter_list_by_score(elem_list: Set[MediaElement]) -> Set[MediaElement]:
elem_count = len(elem_list)
if limit is None or elem_count <= limit:
return elem_list
# cache pref score for this
gen_pre_score = cache(gen_score)
# biggest possible score increase by adaption
max_score_inc = preference.max_score_increase(
score=score_adapt,
adapt_count=limit,
)
logging.debug(f"Max adaption possible: {max_score_inc}")
# differenciate adapted buffing and adapted nerfing
without_max_adapt: ScoreCalc = lambda elem: gen_pre_score(elem)
with_max_adapt: ScoreCalc = lambda elem: without_max_adapt(elem) + max_score_inc
is_nerfing = score_adapt >= 0
if is_nerfing:
best_case = without_max_adapt
worst_case = with_max_adapt
else: # is buffing
best_case = with_max_adapt
worst_case = without_max_adapt
# (limit)ths best's score in the worst adaption for it
limitths_best_worst = sorted(worst_case(elem) for elem in elem_list)[limit]
logging.debug(f"(limit)ths best's worst case score: {limitths_best_worst}")
# extract worst's element's score in best case as well
worsts_best = best_case(max(elem_list, key=gen_pre_score))
logging.debug(f"Worsts best case score is {worsts_best}")
# check if reducing element count is possible
if limitths_best_worst < worsts_best:
# throw away all elements which's best adaption is not better than the (limit)ths one
ret = {elem for elem in elem_list if best_case(elem) < limitths_best_worst}
logging.debug(
f"Prefilter reduced set from {elem_count} to {len(ret)} elements"
)
return ret
logging.debug(f"Prefilter couldn't reduce the element count ({elem_count})")
return elem_list
element_list = pre_filter_list_by_score(element_list)
# gen elements
res_ids = list[int]()
while 0 < len(element_list):
first_element = min(element_list, key=gen_score)
res_ids.append(first_element.id)
if limit is not None and limit <= len(res_ids):
break
element_list.remove(first_element)
preference = preference.adapt_score(first_element, score_adapt)
# revert any changes on DB
orm.rollback()
db.execute(f"ALTER TABLE {Tag._table_} AUTO_INCREMENT = 1;")
# return MediaElements
return [MediaElement[i] for i in res_ids]
@dataclass
class CollectionStats:

@ -0,0 +1,70 @@
from __future__ import annotations
from abc import abstractmethod, abstractproperty
from typing import Iterable, Optional, Set
class UriHolder:
### abstracted
@abstractproperty
def _primary_uri(self) -> str:
"""Returns the primary uri of this object in a naive way."""
@abstractmethod
def _set_primary_uri(self, uri: str) -> None:
"""Sets the primary uri of this object in a naive way."""
@abstractproperty
def _get_uri_set(self) -> Set[str]:
"""Returns the uri set of this object in a naive way."""
@abstractmethod
def _set_uri_set(self, uri_set: Set[str]) -> None:
"""Sets the uri set of this object in a naive way."""
@abstractmethod
def _add_uri_to_set(self, uri: str) -> bool:
"""Adds a uri to the uri set of this object in a naive way.
Returns True if the uri was not in the uri set before.
"""
@abstractmethod
def _remove_uri_from_set(self, uri: str) -> bool:
"""Removes a uri to the uri set of this object in a naive way.
Returns True if the uri was in the uri set before.
"""
### implemented
@property
def primary_uri(self) -> str:
"""Returns the current primary uri of this object."""
return self._primary_uri
def is_primary_uri(self, compare_uri: str) -> bool:
"""Returns True if the given uri is equal to the current primary uri."""
return self.primary_uri == compare_uri
def set_primary_uri(self, uri: str) -> bool:
"""Sets the current primary of this object.
It will also add the uri to the uri set.
Returns True if the uri was not in the uri set before.
"""
ret = self._add_uri_to_set(uri) # might fail, so try first
self._set_primary_uri(uri)
return ret
def set_as_only_uri(self, uri: str) -> None:
self._set_uri_set({uri}) # might fail, so try first
self._set_primary_uri(uri)
def add_single_uri(self, uri: str) -> bool:
return self._add_uri_to_set(uri)
def add_uris(self, uri_list: Iterable[Optional[str]]) -> bool:
return any([self.add_single_uri(uri) for uri in set(uri_list) if uri])

@ -0,0 +1,21 @@
from typing import Tuple
THUMBNAIL_ALLOWED_TYPES = [
"image/avif",
"image/jpeg",
"image/png",
"image/webp",
]
THUMBNAIL_HEADERS = {
"Accept": ",".join(THUMBNAIL_ALLOWED_TYPES) + ",*/*;q=0.9",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:91.0) Gecko/20100101 Firefox/91.0",
}
THUMBNAIL_TARGET = 16 / 9
def thumbnail_sort_key(width: int, height: int) -> Tuple:
return (
abs((width / height) - THUMBNAIL_TARGET),
width * height,
)

@ -0,0 +1,10 @@
from .elem_scoring import (
generate_preference_list,
)
from .tag_scoring import (
PreferenceScore,
PreferenceScoreAppender,
PreferenceScoreCompatible,
PreferenceScoreSuper,
)

@ -0,0 +1,171 @@
from __future__ import annotations
from datetime import datetime, timedelta
from functools import cache
from itertools import chain
import logging
import math
from typing import Callable, Iterable, List, Optional, Sequence, Set, TypeAlias
from pony import orm
from .tag_scoring import PreferenceScore
from ..models import (
MediaCollection,
MediaElement,
Tag,
db,
get_all_elements_tags_recursive,
)
ScoreCalc: TypeAlias = Callable[["MediaElement"], float]
def generate_preference_list(
object_gen: Callable[[], List[MediaElement]],
score_adapt: float,
base: Optional[PreferenceScore] = None,
limit: Optional[int] = None,
) -> List[MediaElement]:
element_list: Set[MediaElement] = set(object_gen())
preference = base if base is not None else PreferenceScore()
now = datetime.now() # reference time
def add_tags_for_collections() -> None:
collections: Iterable[MediaCollection] = MediaCollection.select()
for coll in collections:
coll.tag_list.add(
Tag(
title="Automatic",
use_for_preferences=True,
)
)
def add_tags_for_extractor_names() -> None:
@cache
def get_extractor_tag(extractor_name: str) -> Tag:
return Tag(
title=f"Automatic for extractor {extractor_name}",
use_for_preferences=True,
)
for element in element_list:
element.tag_list.add(get_extractor_tag(element.extractor_name))
add_tags_for_collections()
add_tags_for_extractor_names()
orm.flush() # flush after custom tags
# score calc
elem_tag_map = get_all_elements_tags_recursive()
def all_tags(element: MediaElement) -> Sequence[Tag]:
return elem_tag_map.get(element.id, [])
# TODO prepare static score in parallel (or cache it in DB for longer)
@cache
def gen_statis_score(element: MediaElement) -> float:
pinned_collections = orm.count(
link for link in element.collection_links if link.collection.pinned
)
# reference_date = orm.max((elem_link.element.release_date for coll_link in element.collection_links for elem_link in coll_link.collection.media_links if coll_link.collection.watch_in_order and not elem_link.element.skip_over), default=element.release_date)
# reference_date = max((l.collection.last_release_date_to_watch for l in element.collection_links if l.collection.watch_in_order), default=element.release_date)
reference_date = element.release_date
age_nerf = (
(
max(-0.5, math.log((now - reference_date) / timedelta(days=14)) - 1)
if reference_date < now # possible on yet to release media
else -0.5
)
# nerf the nerf when pinned or started to prevent hiding
* 0.1
if (pinned_collections > 0) or element.started
else 1
)
# avg_rel = element.average_release_per_week or element.left_length
# avg_rel = element.left_length
all_nerfs = (
# by id to make sorting consistent
(10**-8) * math.log(element.id + 1000),
# for age of media (newer is better)
age_nerf,
# for average length in relevant collections / length of video itself
# max(0, (math.log(avg_rel + 1) - 5) / 2) if avg_rel else 0
)
all_buffs = (
# for already began to watch
2 if element.started else 0,
# for count of tags (more is better)
0.5 * math.log(len(all_tags(element)) + 1),
# for being in pinned collections
3 * math.log(pinned_collections + 1),
)
return math.fsum(chain(all_nerfs, (-val for val in all_buffs)))
def gen_score(element: MediaElement) -> float:
static_score = gen_statis_score(element)
pref_score = preference.calculate_iter_score(all_tags(element))
return static_score + pref_score
# pre filter list
# - elements which have a too low current score may never possible appear
# TODO add test that this does not change end result
def pre_filter_list_by_score(elem_list: Set[MediaElement]) -> Set[MediaElement]:
elem_count = len(elem_list)
if limit is None or elem_count <= limit:
return elem_list
# cache pref score for this
gen_pre_score = cache(gen_score)
# biggest possible score increase by adaption
max_score_inc = preference.max_score_increase(
score=score_adapt,
adapt_count=limit,
)
logging.debug(f"Max adaption possible: {max_score_inc}")
# differenciate adapted buffing and adapted nerfing
without_max_adapt: ScoreCalc = lambda elem: gen_pre_score(elem)
with_max_adapt: ScoreCalc = lambda elem: without_max_adapt(elem) + max_score_inc
is_nerfing = score_adapt >= 0
if is_nerfing:
best_case = without_max_adapt
worst_case = with_max_adapt
else: # is buffing
best_case = with_max_adapt
worst_case = without_max_adapt
# (limit)ths best's score in the worst adaption for it
limitths_best_worst = sorted(worst_case(elem) for elem in elem_list)[limit]
logging.debug(f"(limit)ths best's worst case score: {limitths_best_worst}")
# extract worst's element's score in best case as well
worsts_best = best_case(max(elem_list, key=gen_pre_score))
logging.debug(f"Worsts best case score is {worsts_best}")
# check if reducing element count is possible
if limitths_best_worst < worsts_best:
# throw away all elements which's best adaption is not better than the (limit)ths one
ret = {elem for elem in elem_list if best_case(elem) < limitths_best_worst}
logging.debug(
f"Prefilter reduced set from {elem_count} to {len(ret)} elements"
)
return ret
logging.debug(f"Prefilter couldn't reduce the element count ({elem_count})")
return elem_list
element_list = pre_filter_list_by_score(element_list)
# gen elements
res_ids = list[int]()
while 0 < len(element_list):
first_element = min(element_list, key=gen_score)
res_ids.append(first_element.id)
if limit is not None and limit <= len(res_ids):
break
element_list.remove(first_element)
preference = preference.adapt_score(first_element, score_adapt)
# revert any changes on DB
orm.rollback()
db.execute(f"ALTER TABLE {Tag._table_} AUTO_INCREMENT = 1;")
# return MediaElements
return [MediaElement[i] for i in res_ids]

@ -0,0 +1,147 @@
from __future__ import annotations
import base64
import dataclasses
from dataclasses import dataclass
import gzip
import json
import math
from typing import Dict, Iterable, List, TypeAlias, Union
from ..extras import Chain
from ..models import Tag, Tagable
@dataclass
class PreferenceScore:
points: Dict[Tag, float] = dataclasses.field(default_factory=lambda: {})
def __add__(self, other: PreferenceScoreCompatible) -> PreferenceScore:
return (self & other).calculate()
def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender:
return PreferenceScoreAppender(self, other)
def __mul__(self, scalar: float) -> PreferenceScore:
return PreferenceScore(
{tag: score * scalar for tag, score in self.points.items()}
)
def __neg__(self) -> PreferenceScore:
return self * -1
@staticmethod
def max_score_increase(score: float, adapt_count: int) -> float:
# depends on PreferenceScoreAppender.share_score(_flat)
return score * adapt_count
def adapt_score(
self,
tagable: Tagable,
score: float,
on_hierachy: bool = True,
) -> PreferenceScore:
addition = (
PreferenceScoreAppender.share_score
if on_hierachy
else PreferenceScoreAppender.share_score_flat
)(tagable, score)
return (self & addition).calculate()
def calculate_score(self, object: Tagable) -> float:
return self.calculate_iter_score(object.all_tags)
def calculate_iter_score(self, tag_iter: Iterable[Tag]) -> float:
return math.fsum(self.points.get(tag, 0) for tag in tag_iter)
@classmethod
def from_json(cls, data: str) -> PreferenceScore:
dicts: Dict = json.loads(data)
return cls({Tag[id]: score for id, score in dicts.items()})
@classmethod
def from_base64(cls, in_data: str, encoding: str = "utf-8") -> PreferenceScore:
return (
Chain(in_data)
| (lambda d: d.encode(encoding=encoding))
| base64.decodebytes
| gzip.decompress
| (lambda d: d.decode(encoding=encoding))
| PreferenceScore.from_json
).get()
def to_json(self) -> str:
return json.dumps({tag.id: score for tag, score in self.points.items()})
def to_base64(self, encoding: str = "utf-8") -> str:
return (
Chain(self)
| PreferenceScore.to_json
| (lambda d: d.encode(encoding=encoding))
| (
lambda d: gzip.compress(
data=d,
compresslevel=9,
)
)
| base64.encodebytes
| (lambda d: d.decode(encoding=encoding))
).get()
class PreferenceScoreAppender:
points_list: List[PreferenceScore]
@staticmethod
def share_score_flat(obj: Tagable, score: float) -> PreferenceScoreSuper:
# influences PreferenceScore.max_score_increase
direct_tags = [tag for tag in obj.direct_tags if tag.use_for_preferences]
if len(direct_tags) <= 0:
return PreferenceScoreAppender()
return PreferenceScore({tag: score / len(direct_tags) for tag in direct_tags})
@classmethod
def share_score(cls, obj: Tagable, score: float) -> PreferenceScoreSuper:
# influences PreferenceScore.max_score_increase
super_tags = [tag for tag in obj.super_tags if tag.use_for_preferences]
super_fraction = len(super_tags)
direct_fraction = super_fraction + 1
full_fraction = super_fraction + direct_fraction
single_share = score / full_fraction
direct_share = cls.share_score_flat(obj, single_share * direct_fraction)
super_shares = (super_tag.share_score(single_share) for super_tag in super_tags)
return direct_share & super_shares
def __init__(self, *args: PreferenceScoreCompatible):
self.points_list = []
for preference in args:
self.__append(preference)
def __append(self, preference: PreferenceScoreCompatible) -> None:
if isinstance(preference, PreferenceScore):
self.points_list.append(preference)
elif isinstance(preference, PreferenceScoreAppender):
self.points_list.extend(preference.points_list)
else:
for sub_pref in preference:
self.__append(sub_pref)
def __and__(self, other: PreferenceScoreCompatible) -> PreferenceScoreAppender:
return PreferenceScoreAppender(self, other)
def calculate(self) -> PreferenceScore:
combined: Dict[Tag, List[float]] = {}
for preference in self.points_list:
for tag, score in preference.points.items():
if tag not in combined:
combined[tag] = []
combined[tag].append(score)
return PreferenceScore(
{tag: math.fsum(scores) for tag, scores in combined.items()}
)
PreferenceScoreSuper: TypeAlias = Union[PreferenceScore, PreferenceScoreAppender]
PreferenceScoreCompatible: TypeAlias = Union[
PreferenceScoreSuper, Iterable[PreferenceScoreSuper]
]
Loading…
Cancel
Save