Skip to content

Commit

Permalink
chore: simplify query popularity
Browse files Browse the repository at this point in the history
  • Loading branch information
smotornyuk committed Nov 24, 2023
1 parent fff9f07 commit c6aabdb
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 141 deletions.
16 changes: 16 additions & 0 deletions ckanext/search_tweaks/query_popularity/logic/action.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations
from typing import Any
from ckan import types
from ckan.logic import validate
import ckan.plugins.toolkit as tk

from ckanext.search_tweaks.query_popularity.score import Score
from . import schema


@tk.side_effect_free
Expand All @@ -30,6 +32,20 @@ def search_tweaks_query_popularity_export(
return {"results": results, "count": len(results)}


@validate(schema.query_popularity_import)
def search_tweaks_query_popularity_import(
context: types.Context, data_dict: dict[str, Any]
) -> dict[str, Any]:
tk.check_access("sysadmin", context, data_dict)
score = Score()

if tk.asbool(data_dict.get("reset")):
score.reset()
score.restore(data_dict["snapshot"])
score.refresh()
return {"success": True}


def search_tweaks_query_popularity_ignore(
context: types.Context, data_dict: dict[str, Any]
):
Expand Down
10 changes: 10 additions & 0 deletions ckanext/search_tweaks/query_popularity/logic/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import annotations

from ckan.logic.schema import validator_args

@validator_args
def query_popularity_import(not_empty, boolean_validator, convert_to_json_if_string):
return {
"snapshot": [not_empty, convert_to_json_if_string],
"reset": [boolean_validator],
}
2 changes: 1 addition & 1 deletion ckanext/search_tweaks/query_popularity/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def after_dataset_search(self, results: dict[str, Any], params: dict[str, Any]):
plugin.skip_query_popularity(params)
for plugin in p.PluginImplementations(IQueryPopularity)
):
self.score.save(params["q"])
self.score.hit(params["q"].strip())

return results

Expand Down
159 changes: 20 additions & 139 deletions ckanext/search_tweaks/query_popularity/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,164 +5,45 @@
from hashlib import md5
from typing import Any, Iterable, cast
from operator import itemgetter
from ckan.lib.redis import connect_to_redis
import ckan.plugins.toolkit as tk
from redis import Redis
from ckanext.toolbelt.utils.tracking import DateTracker
from . import config

log = logging.getLogger(__name__)
connect_to_redis: Any


class Score:
redis: Redis[bytes]
date_format = "%Y-%m-%d %H-%M"

def __init__(self):
self.redis = connect_to_redis()

site = tk.config["ckan.site_id"]
self.prefix = f"{site}:search_tweaks:qp"
self.track = DateTracker(
"search_tweaks:qp",
throttling_time=config.throttle(),
max_age=config.max_age(),
obsoletion_period=config.obsoletion_period(),
personalized=True,
)

def export(self):
data: dict[bytes, dict[str, Any]] = {
hash: {"query": query.decode(), "records": []}
for hash, query in self.redis.hgetall(self.trans_key()).items()
}
for k, v in self.redis.hscan_iter(self.distribution_key()):
date_str, q_hash = k.split(b"/", 1)
try:
date = datetime.strptime(date_str.decode(), self.date_format)
except ValueError:
continue

data[q_hash]["records"].append({"date": date.isoformat(), "count": int(v)})

return list(data.values())
return self.track.snapshot()

def save(self, q: str):
q = q.strip()
q_hash = self.hash(q)
def restore(self, snapshot: Any):
return self.track.restore(snapshot)

if self.is_ignored(q_hash):
return
def hit(self, q: str):
self.track.hit(q)

if self.is_throttling(q_hash):
return

self.redis.hset(self.trans_key(), q_hash, q)

date_stem = self.format_date_stem(self.now())
def refresh(self):
self.track.refresh()

self.redis.hincrby(self.distribution_key(), f"{date_stem}/{q_hash}", 1)
def stats(self, num: int) -> Iterable[dict[str, str | float]]:
return self.track.most_common(num)

def drop(self, q: str):
q_hash = self.hash(q)
dk = self.distribution_key()

series = self.redis.hscan_iter(dk, f"*/{q_hash}")
keys = list(map(itemgetter(0), series))
if keys:
self.redis.hdel(dk, *keys)

self.redis.hdel(self.trans_key(), q_hash)
self.redis.zrem(self.score_key(), q_hash)

def is_throttling(self, q_hash: str):
user = tk.current_user.name

throttle_key = f"{self.prefix}:throttle:{user}:{q_hash}"
if self.redis.exists(throttle_key):
return True

self.redis.set(throttle_key, 1, ex=config.throttle())
return False
self.track.drop(q)

def reset(self):
keys = self.redis.keys(f"{self.prefix}:*")
if keys:
self.redis.delete(*keys)

def refresh(self):
max_age = timedelta(seconds=config.max_age())
dk = self.distribution_key()
sk = self.score_key()

expired_dist: set[bytes] = set()
distribution = cast(
"Iterable[tuple[bytes, bytes]]",
self.redis.hscan_iter(dk),
)

scores: dict[bytes, float] = defaultdict(float)

for k, v in distribution:
date_str, q_hash = k.split(b"/", 1)
try:
date = datetime.strptime(date_str.decode(), self.date_format)
except ValueError:
log.error("Remove invalid key %s", k)
expired_dist.add(k)
continue

age = self.now() - date

if age > max_age:
expired_dist.add(k)
continue

scores[q_hash] += int(v) / (age.seconds // config.obsoletion_period() + 1)

if expired_dist:
self.redis.hdel(dk, *expired_dist)

expired_scores: set[bytes] = set()
for k, v in self.redis.zscan_iter(sk):
if k not in scores:
expired_scores.add(k)
continue
if scores:
self.redis.zadd(sk, cast(Any, scores))

if expired_scores:
self.redis.zrem(sk, *expired_scores)
self.redis.hdel(self.trans_key(), *expired_scores)

def hash(self, q: str):
return md5(q.encode()).hexdigest()

def is_ignored(self, q_hash: str):
return self.redis.sismember(self.ignore_key(), q_hash)
self.track.reset()

def ignore(self, q: str):
return self.redis.sadd(self.ignore_key(), self.hash(q))

def now(self):
return datetime.utcnow()

def score_key(self):
return f"{self.prefix}:score"

def trans_key(self):
return f"{self.prefix}:trans"

def ignore_key(self):
return f"{self.prefix}:ignore"

def distribution_key(self):
return f"{self.prefix}:distribution"

def format_date_stem(self, date: datetime):
return date.strftime(self.date_format)

def stats(self, num: int) -> Iterable[dict[str, str | float]]:
scores: list[tuple[bytes, float]] = self.redis.zrange(
self.score_key(), 0, num - 1, desc=True, withscores=True
)
trans_key = self.trans_key()

for k, v in scores:
query = self.redis.hget(trans_key, k)
if not query:
continue
yield {"query": query.decode(), "score": v}
self.track.ignore(self.track.hash(q))
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = ckanext-search-tweaks
version = 0.6.1a0
version = 0.6.1a1
description =
long_description = file: README.md
long_description_content_type = text/markdown
Expand All @@ -22,6 +22,7 @@ namespace_packages = ckanext
install_requires =
freezegun
typing_extensions>=4.0.0
ckanext-toolbelt>=0.4.11
include_package_data = True

[options.entry_points]
Expand Down

0 comments on commit c6aabdb

Please # to comment.