You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
5.5 KiB
Python

from __future__ import annotations
import base64
import dataclasses
from dataclasses import dataclass
import gzip
import json
import math
from typing import Dict, Generic, Iterable, List, TypeAlias, Union
from ..extras import Chain
from .tag_protocol import T, TagableProto, TagGetter
@dataclass
class PreferenceScore(Generic[T]):
points: Dict[T, float] = dataclasses.field(default_factory=lambda: {})
def __add__(
self,
other: PreferenceScoreCompatible[T],
) -> PreferenceScore[T]:
return (self & other).calculate()
def __and__(
self,
other: PreferenceScoreCompatible[T],
) -> PreferenceScoreAppender[T]:
return PreferenceScoreAppender[T](self, other)
def __mul__(self, scalar: float) -> PreferenceScore[T]:
return PreferenceScore[T](
{tag: score * scalar for tag, score in self.points.items()}
)
def __neg__(self) -> PreferenceScore[T]:
return self * -1
@staticmethod
def max_score_increase(score: float, adapt_count: int) -> float:
# depends on PreferenceScoreAppender.share_score(_flat)
return score * adapt_count
def adapt_score(
self,
tagable: TagableProto[T],
score: float,
on_hierachy: bool = True,
) -> PreferenceScore[T]:
addition = (
PreferenceScoreAppender.share_score
if on_hierachy
else PreferenceScoreAppender.share_score_flat
)(tagable, score)
return (self & addition).calculate()
def calculate_score(self, object: TagableProto[T]) -> float:
return self.calculate_iter_score(object.all_tags)
def calculate_iter_score(self, tag_iter: Iterable[T]) -> float:
return math.fsum(self.points.get(tag, 0) for tag in tag_iter)
@classmethod
def from_json(cls, data: str, get_tag: TagGetter[T]) -> PreferenceScore[T]:
dicts: Dict[int, float] = json.loads(data)
return cls({get_tag(id): score for id, score in dicts.items()})
@classmethod
def from_base64(
cls,
in_data: str,
get_tag: TagGetter[T],
encoding: str = "utf-8",
) -> PreferenceScore[T]:
return (
Chain(in_data)
| (lambda d: d.encode(encoding=encoding))
| base64.decodebytes
| gzip.decompress
| (lambda d: d.decode(encoding=encoding))
| (lambda d: PreferenceScore.from_json(d, get_tag))
).get()
def to_json(self) -> str:
return json.dumps({tag.id: score for tag, score in self.points.items()})
def to_base64(self, encoding: str = "utf-8") -> str:
return (
Chain(self)
| PreferenceScore.to_json
| (lambda d: d.encode(encoding=encoding))
| (
lambda d: gzip.compress(
data=d,
compresslevel=9,
)
)
| base64.encodebytes
| (lambda d: d.decode(encoding=encoding))
).get()
class PreferenceScoreAppender(Generic[T]):
points_list: List[PreferenceScore[T]]
@staticmethod
def share_score_flat(obj: TagableProto[T], score: float) -> PreferenceScoreSuper[T]:
# influences PreferenceScore.max_score_increase
direct_tags = [tag for tag in obj.direct_tags if tag.use_for_preferences]
if len(direct_tags) <= 0:
return PreferenceScoreAppender()
return PreferenceScore({tag: score / len(direct_tags) for tag in direct_tags})
@classmethod
def share_score(cls, obj: TagableProto[T], score: float) -> PreferenceScoreSuper[T]:
# influences PreferenceScore.max_score_increase
super_tags = [tag for tag in obj.super_tags if tag.use_for_preferences]
super_fraction = len(super_tags)
direct_fraction = super_fraction + 1
full_fraction = super_fraction + direct_fraction
single_share = score / full_fraction
direct_share = cls.share_score_flat(obj, single_share * direct_fraction)
super_shares = (super_tag.share_score(single_share) for super_tag in super_tags)
return direct_share & super_shares
def __init__(self, *args: PreferenceScoreCompatible[T]):
self.points_list = []
for preference in args:
self.__append(preference)
def __append(
self,
preference: PreferenceScoreCompatible[T],
) -> None:
if isinstance(preference, PreferenceScore):
self.points_list.append(preference)
elif isinstance(preference, PreferenceScoreAppender):
self.points_list.extend(preference.points_list)
else:
for sub_pref in preference:
self.__append(sub_pref)
def __and__(
self,
other: PreferenceScoreCompatible[T],
) -> PreferenceScoreAppender[T]:
return PreferenceScoreAppender(self, other)
def calculate(self) -> PreferenceScore[T]:
combined: Dict[T, List[float]] = {}
for preference in self.points_list:
for tag, score in preference.points.items():
if tag not in combined:
combined[tag] = []
combined[tag].append(score)
return PreferenceScore(
{tag: math.fsum(scores) for tag, scores in combined.items()}
)
PreferenceScoreSuper: TypeAlias = Union[
PreferenceScore[T],
PreferenceScoreAppender[T],
]
PreferenceScoreCompatible: TypeAlias = Union[
PreferenceScoreSuper[T],
Iterable[PreferenceScoreSuper[T]],
]