Skip to content

Commit

Permalink
Refactor RateLimitEnforcement struct into a method
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanSkonnord committed Apr 1, 2021
1 parent ae00b96 commit d4af165
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 69 deletions.
8 changes: 4 additions & 4 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,17 +690,17 @@ impl Project {
Ok(self.rate_limits.check_with_quotas(quotas, item_scoping))
});

let enforcement = envelope_limiter.enforce(&mut envelope, scoping)?;
enforcement.emit_outcomes(scoping, &self.outcome_producer);

let rate_limits = envelope_limiter.enforce(&mut envelope, scoping, |outcome| {
self.outcome_producer.do_send(outcome)
})?;
let envelope = if envelope.is_empty() {
None
} else {
Some(envelope)
};
Ok(CheckedEnvelope {
envelope,
rate_limits: enforcement.applied_limits,
rate_limits,
})
}

Expand Down
119 changes: 54 additions & 65 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use std::{
time::Instant,
};

use actix::Addr;
use relay_general::protocol::EventId;
use relay_quotas::{
DataCategories, DataCategory, ItemScoping, QuotaScope, RateLimit, RateLimitScope, RateLimits,
ReasonCode, Scoping,
};

use crate::{
actors::outcome::{Outcome, OutcomeProducer, TrackOutcome},
actors::outcome::{Outcome, TrackOutcome},
envelope::{Envelope, Item, ItemType},
};

Expand Down Expand Up @@ -243,23 +242,24 @@ where
}

/// Process rate limits for the envelope, removing offending items and returning applied limits.
pub fn enforce(
pub fn enforce<P>(
mut self,
envelope: &mut Envelope,
scoping: &Scoping,
) -> Result<RateLimitEnforcement, E> {
outcome_producer: P,
) -> Result<RateLimits, E>
where
P: Fn(TrackOutcome),
{
let mut summary = EnvelopeSummary::compute(envelope);
if let Some(event_category) = self.event_category {
summary.event_category = Some(event_category);
}

let applied_limits = self.execute(&summary, scoping)?;
let limited_items = self.apply_retention(envelope);
Ok(RateLimitEnforcement {
summary,
applied_limits,
limited_items,
})
self.emit_outcomes(limited_items, &summary, scoping, outcome_producer);
Ok(applied_limits)
}

fn apply_retention(&mut self, envelope: &mut Envelope) -> Vec<RateLimitForItem> {
Expand Down Expand Up @@ -361,47 +361,47 @@ where
retain_in_envelope: true,
}
}
}

impl<F> fmt::Debug for EnvelopeLimiter<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EnvelopeLimiter")
.field("event_category", &self.event_category)
.field("event_limit", &self.event_limit)
.field("attachment_limit", &self.attachment_limit)
.field("session_limit", &self.session_limit)
.finish()
}
}

pub struct RateLimitEnforcement {
pub applied_limits: RateLimits,
summary: EnvelopeSummary,
limited_items: Vec<RateLimitForItem>,
}

impl RateLimitEnforcement {
pub fn emit_outcomes(&self, scoping: &Scoping, outcome_producer: &Addr<OutcomeProducer>) {
fn emit_outcomes<P>(
&self,
limited_items: Vec<RateLimitForItem>,
summary: &EnvelopeSummary,
scoping: &Scoping,
outcome_producer: P,
) where
P: Fn(TrackOutcome),
{
let timestamp = Instant::now();
for limited_item in self.limited_items.iter() {
for limited_item in limited_items.iter() {
let reason_code = &limited_item.applied_limit.reason_code;
let category = limited_item.item_category;
outcome_producer.do_send(TrackOutcome {
outcome_producer(TrackOutcome {
timestamp,
scoping: *scoping,
outcome: Outcome::RateLimited(reason_code.clone()),
event_id: self.summary.event_id,
remote_addr: self.summary.remote_addr,
event_id: summary.event_id,
remote_addr: summary.remote_addr,
category,
quantity: match category {
DataCategory::Attachment => self.summary.attachment_quantity,
DataCategory::Attachment => summary.attachment_quantity,
_ => 1,
},
});
}
}
}

impl<F> fmt::Debug for EnvelopeLimiter<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EnvelopeLimiter")
.field("event_category", &self.event_category)
.field("event_limit", &self.event_limit)
.field("attachment_limit", &self.attachment_limit)
.field("session_limit", &self.session_limit)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -591,10 +591,9 @@ mod tests {
let mut envelope = envelope![];

let mut mock = MockLimiter::default();
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

assert!(!limits.is_limited());
assert!(envelope.is_empty());
Expand All @@ -608,10 +607,9 @@ mod tests {
let mut envelope = envelope![Event];

let mut mock = MockLimiter::default().deny(DataCategory::Error);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

assert!(limits.is_limited());
assert!(envelope.is_empty());
Expand All @@ -625,10 +623,9 @@ mod tests {
let mut envelope = envelope![Event, Attachment];

let mut mock = MockLimiter::default().deny(DataCategory::Error);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

assert!(limits.is_limited());
assert!(envelope.is_empty());
Expand All @@ -643,10 +640,9 @@ mod tests {
let mut envelope = envelope![Attachment::Minidump];

let mut mock = MockLimiter::default().deny(DataCategory::Error);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

assert!(limits.is_limited());
assert!(envelope.is_empty());
Expand All @@ -661,10 +657,9 @@ mod tests {
let mut envelope = envelope![Attachment::Minidump, Attachment];

let mut mock = MockLimiter::default().deny(DataCategory::Attachment);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

// Attachments would be limited, but crash reports create events and are thus allowed.
assert!(limits.is_limited());
Expand All @@ -679,10 +674,9 @@ mod tests {
let mut envelope = envelope![Attachment::Minidump];

let mut mock = MockLimiter::default().deny(DataCategory::Attachment);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

// If only crash report attachments are present, we don't emit a rate limit.
assert!(!limits.is_limited());
Expand All @@ -702,10 +696,9 @@ mod tests {
envelope.add_item(item);

let mut mock = MockLimiter::default().deny(DataCategory::Error);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

assert!(!limits.is_limited()); // No new rate limits applied.
assert_eq!(envelope.len(), 1); // The item was retained
Expand All @@ -719,10 +712,9 @@ mod tests {
let mut envelope = envelope![Session, Session, Session];

let mut mock = MockLimiter::default().deny(DataCategory::Error);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

// If only crash report attachments are present, we don't emit a rate limit.
assert!(!limits.is_limited());
Expand All @@ -737,10 +729,9 @@ mod tests {
let mut envelope = envelope![Session, Session, Event];

let mut mock = MockLimiter::default().deny(DataCategory::Session);
let enforcement = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping())
let limits = EnvelopeLimiter::new(|s, q| mock.check(s, q))
.enforce(&mut envelope, &scoping(), |_| {})
.unwrap();
let limits = enforcement.applied_limits;

// If only crash report attachments are present, we don't emit a rate limit.
assert!(limits.is_limited());
Expand All @@ -758,8 +749,7 @@ mod tests {
let mut mock = MockLimiter::default().deny(DataCategory::Transaction);
let mut limiter = EnvelopeLimiter::new(|s, q| mock.check(s, q));
limiter.assume_event(DataCategory::Transaction);
let enforcement = limiter.enforce(&mut envelope, &scoping()).unwrap();
let limits = enforcement.rate_limits;
let limits = limiter.enforce(&mut envelope, &scoping(), |_| {}).unwrap();

assert!(limits.is_limited());
assert!(envelope.is_empty()); // obviously
Expand All @@ -776,8 +766,7 @@ mod tests {
let mut mock = MockLimiter::default().deny(DataCategory::Error);
let mut limiter = EnvelopeLimiter::new(|s, q| mock.check(s, q));
limiter.assume_event(DataCategory::Error);
let enforcement = limiter.enforce(&mut envelope, &scoping()).unwrap();
let limits = enforcement.rate_limits;
let limits = limiter.enforce(&mut envelope, &scoping(), |_| {}).unwrap();

assert!(limits.is_limited());
assert!(envelope.is_empty());
Expand Down

0 comments on commit d4af165

Please # to comment.