From c6aabdb409751986ae7bee46ecb5ed09bc685dfd Mon Sep 17 00:00:00 2001 From: Sergey Motornyuk Date: Fri, 24 Nov 2023 19:29:37 +0200 Subject: [PATCH] chore: simplify query popularity --- .../query_popularity/logic/action.py | 16 ++ .../query_popularity/logic/schema.py | 10 ++ .../search_tweaks/query_popularity/plugin.py | 2 +- .../search_tweaks/query_popularity/score.py | 159 +++--------------- setup.cfg | 3 +- 5 files changed, 49 insertions(+), 141 deletions(-) create mode 100644 ckanext/search_tweaks/query_popularity/logic/schema.py diff --git a/ckanext/search_tweaks/query_popularity/logic/action.py b/ckanext/search_tweaks/query_popularity/logic/action.py index a40a75c..026f076 100644 --- a/ckanext/search_tweaks/query_popularity/logic/action.py +++ b/ckanext/search_tweaks/query_popularity/logic/action.py @@ -1,9 +1,11 @@ from __future__ import annotations from typing import Any from ckan import types +from ckan.logic import validate import ckan.plugins.toolkit as tk from ckanext.search_tweaks.query_popularity.score import Score +from . import schema @tk.side_effect_free @@ -30,6 +32,20 @@ def search_tweaks_query_popularity_export( return {"results": results, "count": len(results)} +@validate(schema.query_popularity_import) +def search_tweaks_query_popularity_import( + context: types.Context, data_dict: dict[str, Any] +) -> dict[str, Any]: + tk.check_access("sysadmin", context, data_dict) + score = Score() + + if tk.asbool(data_dict.get("reset")): + score.reset() + score.restore(data_dict["snapshot"]) + score.refresh() + return {"success": True} + + def search_tweaks_query_popularity_ignore( context: types.Context, data_dict: dict[str, Any] ): diff --git a/ckanext/search_tweaks/query_popularity/logic/schema.py b/ckanext/search_tweaks/query_popularity/logic/schema.py new file mode 100644 index 0000000..376ef44 --- /dev/null +++ b/ckanext/search_tweaks/query_popularity/logic/schema.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from ckan.logic.schema import validator_args + +@validator_args +def query_popularity_import(not_empty, boolean_validator, convert_to_json_if_string): + return { + "snapshot": [not_empty, convert_to_json_if_string], + "reset": [boolean_validator], + } diff --git a/ckanext/search_tweaks/query_popularity/plugin.py b/ckanext/search_tweaks/query_popularity/plugin.py index d7e308e..fe51af0 100644 --- a/ckanext/search_tweaks/query_popularity/plugin.py +++ b/ckanext/search_tweaks/query_popularity/plugin.py @@ -21,7 +21,7 @@ def after_dataset_search(self, results: dict[str, Any], params: dict[str, Any]): plugin.skip_query_popularity(params) for plugin in p.PluginImplementations(IQueryPopularity) ): - self.score.save(params["q"]) + self.score.hit(params["q"].strip()) return results diff --git a/ckanext/search_tweaks/query_popularity/score.py b/ckanext/search_tweaks/query_popularity/score.py index 113e96f..e64e3c5 100644 --- a/ckanext/search_tweaks/query_popularity/score.py +++ b/ckanext/search_tweaks/query_popularity/score.py @@ -5,9 +5,9 @@ from hashlib import md5 from typing import Any, Iterable, cast from operator import itemgetter -from ckan.lib.redis import connect_to_redis import ckan.plugins.toolkit as tk from redis import Redis +from ckanext.toolbelt.utils.tracking import DateTracker from . import config log = logging.getLogger(__name__) @@ -15,154 +15,35 @@ class Score: - redis: Redis[bytes] - date_format = "%Y-%m-%d %H-%M" - def __init__(self): - self.redis = connect_to_redis() - - site = tk.config["ckan.site_id"] - self.prefix = f"{site}:search_tweaks:qp" + self.track = DateTracker( + "search_tweaks:qp", + throttling_time=config.throttle(), + max_age=config.max_age(), + obsoletion_period=config.obsoletion_period(), + personalized=True, + ) def export(self): - data: dict[bytes, dict[str, Any]] = { - hash: {"query": query.decode(), "records": []} - for hash, query in self.redis.hgetall(self.trans_key()).items() - } - for k, v in self.redis.hscan_iter(self.distribution_key()): - date_str, q_hash = k.split(b"/", 1) - try: - date = datetime.strptime(date_str.decode(), self.date_format) - except ValueError: - continue - - data[q_hash]["records"].append({"date": date.isoformat(), "count": int(v)}) - - return list(data.values()) + return self.track.snapshot() - def save(self, q: str): - q = q.strip() - q_hash = self.hash(q) + def restore(self, snapshot: Any): + return self.track.restore(snapshot) - if self.is_ignored(q_hash): - return + def hit(self, q: str): + self.track.hit(q) - if self.is_throttling(q_hash): - return - - self.redis.hset(self.trans_key(), q_hash, q) - - date_stem = self.format_date_stem(self.now()) + def refresh(self): + self.track.refresh() - self.redis.hincrby(self.distribution_key(), f"{date_stem}/{q_hash}", 1) + def stats(self, num: int) -> Iterable[dict[str, str | float]]: + return self.track.most_common(num) def drop(self, q: str): - q_hash = self.hash(q) - dk = self.distribution_key() - - series = self.redis.hscan_iter(dk, f"*/{q_hash}") - keys = list(map(itemgetter(0), series)) - if keys: - self.redis.hdel(dk, *keys) - - self.redis.hdel(self.trans_key(), q_hash) - self.redis.zrem(self.score_key(), q_hash) - - def is_throttling(self, q_hash: str): - user = tk.current_user.name - - throttle_key = f"{self.prefix}:throttle:{user}:{q_hash}" - if self.redis.exists(throttle_key): - return True - - self.redis.set(throttle_key, 1, ex=config.throttle()) - return False + self.track.drop(q) def reset(self): - keys = self.redis.keys(f"{self.prefix}:*") - if keys: - self.redis.delete(*keys) - - def refresh(self): - max_age = timedelta(seconds=config.max_age()) - dk = self.distribution_key() - sk = self.score_key() - - expired_dist: set[bytes] = set() - distribution = cast( - "Iterable[tuple[bytes, bytes]]", - self.redis.hscan_iter(dk), - ) - - scores: dict[bytes, float] = defaultdict(float) - - for k, v in distribution: - date_str, q_hash = k.split(b"/", 1) - try: - date = datetime.strptime(date_str.decode(), self.date_format) - except ValueError: - log.error("Remove invalid key %s", k) - expired_dist.add(k) - continue - - age = self.now() - date - - if age > max_age: - expired_dist.add(k) - continue - - scores[q_hash] += int(v) / (age.seconds // config.obsoletion_period() + 1) - - if expired_dist: - self.redis.hdel(dk, *expired_dist) - - expired_scores: set[bytes] = set() - for k, v in self.redis.zscan_iter(sk): - if k not in scores: - expired_scores.add(k) - continue - if scores: - self.redis.zadd(sk, cast(Any, scores)) - - if expired_scores: - self.redis.zrem(sk, *expired_scores) - self.redis.hdel(self.trans_key(), *expired_scores) - - def hash(self, q: str): - return md5(q.encode()).hexdigest() - - def is_ignored(self, q_hash: str): - return self.redis.sismember(self.ignore_key(), q_hash) + self.track.reset() def ignore(self, q: str): - return self.redis.sadd(self.ignore_key(), self.hash(q)) - - def now(self): - return datetime.utcnow() - - def score_key(self): - return f"{self.prefix}:score" - - def trans_key(self): - return f"{self.prefix}:trans" - - def ignore_key(self): - return f"{self.prefix}:ignore" - - def distribution_key(self): - return f"{self.prefix}:distribution" - - def format_date_stem(self, date: datetime): - return date.strftime(self.date_format) - - def stats(self, num: int) -> Iterable[dict[str, str | float]]: - scores: list[tuple[bytes, float]] = self.redis.zrange( - self.score_key(), 0, num - 1, desc=True, withscores=True - ) - trans_key = self.trans_key() - - for k, v in scores: - query = self.redis.hget(trans_key, k) - if not query: - continue - yield {"query": query.decode(), "score": v} + self.track.ignore(self.track.hash(q)) diff --git a/setup.cfg b/setup.cfg index 27052c6..c53fa39 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = ckanext-search-tweaks -version = 0.6.1a0 +version = 0.6.1a1 description = long_description = file: README.md long_description_content_type = text/markdown @@ -22,6 +22,7 @@ namespace_packages = ckanext install_requires = freezegun typing_extensions>=4.0.0 + ckanext-toolbelt>=0.4.11 include_package_data = True [options.entry_points]