-
-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathbase.py
95 lines (80 loc) · 3.85 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from django.db.models import F
from sentry.signals import buffer_incr_complete
from sentry.tasks.process_buffer import process_incr
from sentry.utils.services import Service
class Buffer(Service):
"""
Buffers act as temporary stores for counters. The default implementation is just a passthru and
does not actually buffer anything.
A useful example might be a Redis buffer. Each time an event gets updated, we send several
add events which just store a key and increment its value. Additionally they fire off a task
to the queue. That task eventually runs and gets the current update value. If the value is
empty, it does nothing, otherwise it updates the row in the database.
This is useful in situations where a single event might be happening so fast that the queue cant
keep up with the updates.
"""
__all__ = ("get", "incr", "process", "process_pending", "validate")
def get(self, model, columns, filters):
"""
We can't fetch values from Celery, so just assume buffer values are all 0 here.
"""
return {col: 0 for col in columns}
def incr(self, model, columns, filters, extra=None, signal_only=None):
"""
>>> incr(Group, columns={'times_seen': 1}, filters={'pk': group.pk})
signal_only - added to indicate that `process` should only call the complete
signal handler with the updated model and skip creates/updates in the database. this
is useful in cases where we need to do additional processing before writing to the
database and opt to do it in a `buffer_incr_complete` receiver.
"""
process_incr.apply_async(
kwargs={
"model": model,
"columns": columns,
"filters": filters,
"extra": extra,
"signal_only": signal_only,
}
)
def process_pending(self, partition=None):
return []
def process(self, model, columns, filters, extra=None, signal_only=None):
from sentry.event_manager import ScoreClause
from sentry.models.group import Group
created = False
if not signal_only:
update_kwargs = {c: F(c) + v for c, v in columns.items()}
if extra:
update_kwargs.update(extra)
# HACK(dcramer): this is gross, but we don't have a good hook to compute this property today
# XXX(dcramer): remove once we can replace 'priority' with something reasonable via Snuba
if model is Group:
if "last_seen" in update_kwargs and "times_seen" in update_kwargs:
update_kwargs["score"] = ScoreClause(
group=None,
times_seen=update_kwargs["times_seen"],
last_seen=update_kwargs["last_seen"],
)
# XXX: create_or_update doesn't fire `post_save` signals, and so this update never
# ends up in the cache. This causes issues when handling issue alerts, and likely
# elsewhere. Use `update` here since we're already special casing, and we know that
# the group will already exist.
try:
group = Group.objects.get(**filters)
except Group.DoesNotExist:
# If the group was deleted by the time we flush buffers we don't care, just
# continue
pass
else:
group.update(**update_kwargs)
created = False
else:
_, created = model.objects.create_or_update(values=update_kwargs, **filters)
buffer_incr_complete.send_robust(
model=model,
columns=columns,
filters=filters,
extra=extra,
created=created,
sender=model,
)