diff --git a/common/quotas/dynamic_rate_limiter_impl.go b/common/quotas/dynamic_rate_limiter_impl.go index e58921d13dd..b4707b5b6e3 100644 --- a/common/quotas/dynamic_rate_limiter_impl.go +++ b/common/quotas/dynamic_rate_limiter_impl.go @@ -26,6 +26,7 @@ package quotas import ( "context" + "fmt" "time" ) @@ -43,22 +44,43 @@ type ( refreshTimer *time.Timer rateLimiter *RateLimiterImpl + name string } + + params struct { + name string + } + + DynamicRateLimiterOption func(p *params) ) var _ RateLimiter = (*DynamicRateLimiterImpl)(nil) +func WithName(name string) DynamicRateLimiterOption { + return func(p *params) { + p.name = name + } +} + // NewDynamicRateLimiter returns a rate limiter which handles dynamic config func NewDynamicRateLimiter( rateBurstFn RateBurst, refreshInterval time.Duration, + opts ...DynamicRateLimiterOption, ) *DynamicRateLimiterImpl { + p := params{ + name: "", + } + for _, opt := range opts { + opt(&p) + } rateLimiter := &DynamicRateLimiterImpl{ rateBurstFn: rateBurstFn, refreshInterval: refreshInterval, refreshTimer: time.NewTimer(refreshInterval), rateLimiter: NewRateLimiter(rateBurstFn.Rate(), rateBurstFn.Burst()), + name: p.name, } return rateLimiter } @@ -67,10 +89,12 @@ func NewDynamicRateLimiter( // for incoming traffic func NewDefaultIncomingRateLimiter( rateFn RateFn, + opts ...DynamicRateLimiterOption, ) *DynamicRateLimiterImpl { return NewDynamicRateLimiter( NewDefaultIncomingRateBurst(rateFn), defaultRefreshInterval, + opts..., ) } @@ -138,6 +162,10 @@ func (d *DynamicRateLimiterImpl) Refresh() { } func (d *DynamicRateLimiterImpl) maybeRefresh() { + if d.name != "" { + panic(fmt.Sprintf("maybeRefresh was called on a DynamicRateLimiterImpl with a name: %q", d.name)) + } + select { case <-d.refreshTimer.C: d.refreshTimer.Reset(d.refreshInterval) diff --git a/service/frontend/fx.go b/service/frontend/fx.go index d85fafb7a83..fe87acdd0e9 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -311,10 +311,22 @@ func RateLimitInterceptorProvider( return interceptor.NewRateLimitInterceptor( configs.NewRequestToRateLimiter( - quotas.NewDefaultIncomingRateLimiter(rateFn), - quotas.NewDefaultIncomingRateLimiter(rateFn), - quotas.NewDefaultIncomingRateLimiter(namespaceReplicationInducingRateFn), - quotas.NewDefaultIncomingRateLimiter(rateFn), + quotas.NewDefaultIncomingRateLimiter( + rateFn, + quotas.WithName("executionRateBurstFn"), + ), + quotas.NewDefaultIncomingRateLimiter( + rateFn, + quotas.WithName("visibilityRateBurstFn"), + ), + quotas.NewDefaultIncomingRateLimiter( + namespaceReplicationInducingRateFn, + quotas.WithName("namespaceReplicationInducingRateBurstFn"), + ), + quotas.NewDefaultIncomingRateLimiter( + rateFn, + quotas.WithName("otherRateBurstFn"), + ), serviceConfig.OperatorRPSRatio, ), map[string]int{},