From af181b40486bd596f7458049441825ce6c106552 Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Tue, 26 Oct 2021 18:37:52 -0700 Subject: [PATCH] Add per namespace burst limit control (#2067) * Add per namespace burst limit control * Default per namespace burst limit to be 4800, 2x of rate limit --- common/dynamicconfig/constants.go | 3 + common/log/throttle_logger.go | 2 +- common/persistence/client/factory.go | 2 +- .../visibility_manager_rate_limited.go | 4 +- common/quotas/bench_test.go | 6 +- common/quotas/dynamic.go | 106 +++++++++---- common/quotas/dynamic_rate_limiter_impl.go | 27 ++-- ...namespace_multi_stage_rate_limiter_impl.go | 150 ------------------ service/frontend/configs/quotas.go | 53 +++++-- service/frontend/fx.go | 24 ++- service/frontend/service.go | 2 + service/history/configs/quotas.go | 2 +- service/history/queueProcessor.go | 2 +- service/history/replicationTaskFetcher.go | 2 +- service/history/replicationTaskProcessor.go | 2 +- .../history/replicationTaskProcessor_test.go | 2 +- service/history/taskPriorityAssigner.go | 2 +- service/history/timerQueueProcessorBase.go | 2 +- service/matching/configs/quotas.go | 2 +- service/matching/forwarder.go | 2 +- service/matching/matcher.go | 40 +++-- service/matching/matchingEngine_test.go | 46 +++--- service/worker/archiver/client.go | 2 +- .../worker/scanner/executions/scavenger.go | 4 +- service/worker/scanner/history/scavenger.go | 2 +- tools/cli/adminDBScanCommand.go | 2 +- tools/cli/adminElasticSearchCommands.go | 2 +- 27 files changed, 214 insertions(+), 281 deletions(-) delete mode 100644 common/quotas/namespace_multi_stage_rate_limiter_impl.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index e3a67833518..8d6f5513a6e 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -109,6 +109,7 @@ var Keys = map[Key]string{ FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize", FrontendRPS: "frontend.rps", FrontendMaxNamespaceRPSPerInstance: "frontend.namespaceRPS", + FrontendMaxNamespaceBurstPerInstance: "frontend.namespaceBurst", FrontendMaxNamespaceCountPerInstance: "frontend.namespaceCount", FrontendGlobalNamespaceRPS: "frontend.globalNamespacerps", FrontendShutdownDrainDuration: "frontend.shutdownDrainDuration", @@ -439,6 +440,8 @@ const ( FrontendRPS // FrontendMaxNamespaceRPSPerInstance is workflow namespace rate limit per second FrontendMaxNamespaceRPSPerInstance + // FrontendMaxNamespaceBurstPerInstance is workflow namespace burst limit + FrontendMaxNamespaceBurstPerInstance // FrontendMaxNamespaceCountPerInstance is workflow namespace count limit per second FrontendMaxNamespaceCountPerInstance // FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster diff --git a/common/log/throttle_logger.go b/common/log/throttle_logger.go index 0d170a769fd..012457a11dd 100644 --- a/common/log/throttle_logger.go +++ b/common/log/throttle_logger.go @@ -48,7 +48,7 @@ func NewThrottledLogger(logger Logger, rps quotas.RateFn) *throttledLogger { logger = sl.Skip(extraSkipForThrottleLogger) } - limiter := quotas.NewDefaultOutgoingDynamicRateLimiter(rps) + limiter := quotas.NewDefaultOutgoingRateLimiter(rps) tl := &throttledLogger{ limiter: limiter, logger: logger, diff --git a/common/persistence/client/factory.go b/common/persistence/client/factory.go index 5354b022030..19f326e67ab 100644 --- a/common/persistence/client/factory.go +++ b/common/persistence/client/factory.go @@ -332,7 +332,7 @@ func buildRateLimiters( result := make(map[string]quotas.RateLimiter, len(cfg.DataStores)) for dsName := range cfg.DataStores { if maxQPS != nil && maxQPS() > 0 { - result[dsName] = quotas.NewDefaultOutgoingDynamicRateLimiter( + result[dsName] = quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(maxQPS()) }, ) } diff --git a/common/persistence/visibility/visibility_manager_rate_limited.go b/common/persistence/visibility/visibility_manager_rate_limited.go index 288fdd2ab15..8b06f768c94 100644 --- a/common/persistence/visibility/visibility_manager_rate_limited.go +++ b/common/persistence/visibility/visibility_manager_rate_limited.go @@ -44,10 +44,10 @@ func NewVisibilityManagerRateLimited( readMaxQPS dynamicconfig.IntPropertyFn, writeMaxQPS dynamicconfig.IntPropertyFn, ) *visibilityManagerRateLimited { - readRateLimiter := quotas.NewDefaultOutgoingDynamicRateLimiter( + readRateLimiter := quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(readMaxQPS()) }, ) - writeRateLimiter := quotas.NewDefaultOutgoingDynamicRateLimiter( + writeRateLimiter := quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(writeMaxQPS()) }, ) return &visibilityManagerRateLimited{ diff --git a/common/quotas/bench_test.go b/common/quotas/bench_test.go index a3930bea2c4..f6faee9b1d4 100644 --- a/common/quotas/bench_test.go +++ b/common/quotas/bench_test.go @@ -48,8 +48,10 @@ func BenchmarkRateLimiter(b *testing.B) { func BenchmarkDynamicRateLimiter(b *testing.B) { limiter := NewDynamicRateLimiter( - func() float64 { return testRate }, - func() int { return testBurst }, + NewRateBurst( + func() float64 { return testRate }, + func() int { return testBurst }, + ), time.Minute, ) for n := 0; n < b.N; n++ { diff --git a/common/quotas/dynamic.go b/common/quotas/dynamic.go index 39b91875acb..dba75d69f99 100644 --- a/common/quotas/dynamic.go +++ b/common/quotas/dynamic.go @@ -32,66 +32,106 @@ type ( // RateFn returns a float64 as the RPS RateFn func() float64 - // BurstFn returns a int as the RPS + // BurstFn returns an int as the burst / bucket size BurstFn func() int - // DynamicRateImpl stores the dynamic rate per second for rate limiter - DynamicRateImpl struct { - rate *atomic.Float64 + // RateBurst returns rate & burst for rate limiter + RateBurst interface { + Rate() float64 + Burst() int } - // DynamicBurstImpl stores the dynamic burst for rate limiter - DynamicBurstImpl struct { - burst *atomic.Int64 + RateBurstImpl struct { + rateFn RateFn + burstFn BurstFn } - DynamicRate interface { - Load() float64 - Store(rate float64) - RateFn() RateFn + // MutableRateBurstImpl stores the dynamic rate & burst for rate limiter + MutableRateBurstImpl struct { + rate *atomic.Float64 + burst *atomic.Int64 } - DynamicBurst interface { - Load() int - Store(burst int) - BurstFn() BurstFn + MutableRateBurst interface { + SetRate(rate float64) + SetBurst(burst int) + RateBurst } ) -func NewDynamicRate(rate float64) *DynamicRateImpl { - - return &DynamicRateImpl{ - rate: atomic.NewFloat64(rate), +func NewRateBurst( + rateFn RateFn, + burstFn BurstFn, +) *RateBurstImpl { + return &RateBurstImpl{ + rateFn: rateFn, + burstFn: burstFn, } } -func NewDynamicBurst(burst int) *DynamicBurstImpl { +func NewDefaultIncomingRateBurst( + rateFn RateFn, +) *RateBurstImpl { + return newDefaultRateBurst(rateFn, defaultIncomingRateBurstRatio) +} - return &DynamicBurstImpl{ - burst: atomic.NewInt64(int64(burst)), +func NewDefaultOutgoingRateBurst( + rateFn RateFn, +) *RateBurstImpl { + return newDefaultRateBurst(rateFn, defaultOutgoingRateBurstRatio) +} + +func newDefaultRateBurst( + rateFn RateFn, + rateToBurstRatio float64, +) *RateBurstImpl { + burstFn := func() int { + rate := rateFn() + if rate < 0 { + rate = 0 + } + if rateToBurstRatio < 0 { + rateToBurstRatio = 0 + } + burst := int(rate * rateToBurstRatio) + if burst == 0 && rate > 0 && rateToBurstRatio > 0 { + burst = 1 + } + return burst } + return NewRateBurst(rateFn, burstFn) } -func (d *DynamicRateImpl) Load() float64 { - return d.rate.Load() +func (d *RateBurstImpl) Rate() float64 { + return d.rateFn() } -func (d *DynamicRateImpl) Store(rate float64) { - d.rate.Store(rate) +func (d *RateBurstImpl) Burst() int { + return d.burstFn() } -func (d *DynamicRateImpl) RateFn() RateFn { - return d.Load +func NewMutableRateBurst( + rate float64, + burst int, +) *MutableRateBurstImpl { + return &MutableRateBurstImpl{ + rate: atomic.NewFloat64(rate), + burst: atomic.NewInt64(int64(burst)), + } } -func (d *DynamicBurstImpl) Load() int { - return int(d.burst.Load()) +func (d *MutableRateBurstImpl) SetRate(rate float64) { + d.rate.Store(rate) } -func (d *DynamicBurstImpl) Store(burst int) { +func (d *MutableRateBurstImpl) SetBurst(burst int) { d.burst.Store(int64(burst)) } -func (d *DynamicBurstImpl) BurstFn() BurstFn { - return d.Load +func (d *MutableRateBurstImpl) Rate() float64 { + return d.rate.Load() +} + +func (d *MutableRateBurstImpl) Burst() int { + return int(d.burst.Load()) } diff --git a/common/quotas/dynamic_rate_limiter_impl.go b/common/quotas/dynamic_rate_limiter_impl.go index 2b03329b9cb..0a3c908b714 100644 --- a/common/quotas/dynamic_rate_limiter_impl.go +++ b/common/quotas/dynamic_rate_limiter_impl.go @@ -38,8 +38,7 @@ const ( type ( // DynamicRateLimiterImpl implements a dynamic config wrapper around the rate limiter DynamicRateLimiterImpl struct { - rateFn RateFn - burstFn BurstFn + rateBurstFn RateBurst refreshInterval time.Duration refreshTimer *time.Timer @@ -51,41 +50,37 @@ var _ RateLimiter = (*DynamicRateLimiterImpl)(nil) // NewDynamicRateLimiter returns a rate limiter which handles dynamic config func NewDynamicRateLimiter( - rateFn RateFn, - burstFn BurstFn, + rateBurstFn RateBurst, refreshInterval time.Duration, ) *DynamicRateLimiterImpl { rateLimiter := &DynamicRateLimiterImpl{ - rateFn: rateFn, - burstFn: burstFn, + rateBurstFn: rateBurstFn, refreshInterval: refreshInterval, refreshTimer: time.NewTimer(refreshInterval), - rateLimiter: NewRateLimiter(rateFn(), burstFn()), + rateLimiter: NewRateLimiter(rateBurstFn.Rate(), rateBurstFn.Burst()), } return rateLimiter } -// NewDefaultIncomingDynamicRateLimiter returns a default rate limiter +// NewDefaultIncomingRateLimiter returns a default rate limiter // for incoming traffic -func NewDefaultIncomingDynamicRateLimiter( +func NewDefaultIncomingRateLimiter( rateFn RateFn, ) *DynamicRateLimiterImpl { return NewDynamicRateLimiter( - rateFn, - func() int { return int(defaultIncomingRateBurstRatio * rateFn()) }, + NewDefaultIncomingRateBurst(rateFn), defaultRefreshInterval, ) } -// NewDefaultOutgoingDynamicRateLimiter returns a default rate limiter +// NewDefaultOutgoingRateLimiter returns a default rate limiter // for outgoing traffic -func NewDefaultOutgoingDynamicRateLimiter( +func NewDefaultOutgoingRateLimiter( rateFn RateFn, ) *DynamicRateLimiterImpl { return NewDynamicRateLimiter( - rateFn, - func() int { return int(defaultOutgoingRateBurstRatio * rateFn()) }, + NewDefaultOutgoingRateBurst(rateFn), defaultRefreshInterval, ) } @@ -142,7 +137,7 @@ func (d *DynamicRateLimiterImpl) maybeRefresh() { select { case <-d.refreshTimer.C: d.refreshTimer.Reset(d.refreshInterval) - d.rateLimiter.SetRateBurst(d.rateFn(), d.burstFn()) + d.rateLimiter.SetRateBurst(d.rateBurstFn.Rate(), d.rateBurstFn.Burst()) default: // noop diff --git a/common/quotas/namespace_multi_stage_rate_limiter_impl.go b/common/quotas/namespace_multi_stage_rate_limiter_impl.go deleted file mode 100644 index 44cbbb143ba..00000000000 --- a/common/quotas/namespace_multi_stage_rate_limiter_impl.go +++ /dev/null @@ -1,150 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package quotas - -import ( - "context" - "sync" - "time" -) - -type ( - // NamespaceMultiStageRateLimiterImpl is a multi stage rate limiter - // special built for multi-tenancy - NamespaceMultiStageRateLimiterImpl struct { - namespaceRateLimiterFn func(namespaceID string) RateLimiter - sharedRateLimiters []RateLimiter - - sync.RWMutex - namespaceRateLimiters map[string]RateLimiter - } -) - -func NewNamespaceMultiStageRateLimiter( - namespaceRateLimiterFn func(namespaceID string) RateLimiter, - sharedRateLimiters []RateLimiter, -) *NamespaceMultiStageRateLimiterImpl { - return &NamespaceMultiStageRateLimiterImpl{ - namespaceRateLimiterFn: namespaceRateLimiterFn, - sharedRateLimiters: sharedRateLimiters, - - namespaceRateLimiters: make(map[string]RateLimiter), - } -} - -// Allow attempts to allow a request to go through. The method returns -// immediately with a true or false indicating if the request can make -// progress -func (r *NamespaceMultiStageRateLimiterImpl) Allow( - namespaceID string, -) bool { - - return r.AllowN(namespaceID, time.Now(), 1) -} - -// AllowN attempts to allow a request to go through. The method returns -// immediately with a true or false indicating if the request can make -// progress -func (r *NamespaceMultiStageRateLimiterImpl) AllowN( - namespaceID string, - now time.Time, - numToken int, -) bool { - - rateLimiter := r.getOrInitRateLimiter(namespaceID) - return rateLimiter.AllowN(now, numToken) -} - -// Reserve returns a Reservation that indicates how long the caller -// must wait before event happen. -func (r *NamespaceMultiStageRateLimiterImpl) Reserve( - namespaceID string, -) Reservation { - - return r.ReserveN(namespaceID, time.Now(), 1) -} - -// ReserveN returns a Reservation that indicates how long the caller -// must wait before event happen. -func (r *NamespaceMultiStageRateLimiterImpl) ReserveN( - namespaceID string, - now time.Time, - numToken int, -) Reservation { - - rateLimiter := r.getOrInitRateLimiter(namespaceID) - return rateLimiter.ReserveN(now, numToken) -} - -// Wait waits till the deadline for a rate limit token to allow the request -// to go through. -func (r *NamespaceMultiStageRateLimiterImpl) Wait( - ctx context.Context, - namespaceID string, -) error { - - return r.WaitN(ctx, namespaceID, 1) -} - -// WaitN waits till the deadline for a rate limit token to allow the request -// to go through. -func (r *NamespaceMultiStageRateLimiterImpl) WaitN( - ctx context.Context, - namespaceID string, - numToken int, -) error { - - rateLimiter := r.getOrInitRateLimiter(namespaceID) - return rateLimiter.WaitN(ctx, numToken) -} - -func (r *NamespaceMultiStageRateLimiterImpl) getOrInitRateLimiter( - namespaceID string, -) RateLimiter { - r.RLock() - rateLimiter, ok := r.namespaceRateLimiters[namespaceID] - r.RUnlock() - if ok { - return rateLimiter - } - - length := len(r.sharedRateLimiters) - rateLimiters := make([]RateLimiter, length+1) - rateLimiters[0] = r.namespaceRateLimiterFn(namespaceID) - for i := 0; i < length; i++ { - rateLimiters[i+1] = r.sharedRateLimiters[i] - } - namespaceRateLimiter := NewMultiRateLimiter(rateLimiters) - - r.Lock() - defer r.Unlock() - - rateLimiter, ok = r.namespaceRateLimiters[namespaceID] - if ok { - return rateLimiter - } - r.namespaceRateLimiters[namespaceID] = namespaceRateLimiter - return namespaceRateLimiter -} diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index e8c6fb56ba9..56205ebbfa8 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -25,6 +25,9 @@ package configs import ( + "time" + + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/quotas" ) @@ -105,14 +108,44 @@ var ( } ) +type ( + NamesapceRateBurstImpl struct { + namespaceName string + rateFn dynamicconfig.FloatPropertyFnWithNamespaceFilter + burstFn dynamicconfig.IntPropertyFnWithNamespaceFilter + } +) + +var _ quotas.RateBurst = (*NamesapceRateBurstImpl)(nil) + +func NewNamespaceRateBurst( + namespaceName string, + rateFn dynamicconfig.FloatPropertyFnWithNamespaceFilter, + burstFn dynamicconfig.IntPropertyFnWithNamespaceFilter, +) *NamesapceRateBurstImpl { + return &NamesapceRateBurstImpl{ + namespaceName: namespaceName, + rateFn: rateFn, + burstFn: burstFn, + } +} + +func (c *NamesapceRateBurstImpl) Rate() float64 { + return c.rateFn(c.namespaceName) +} + +func (c *NamesapceRateBurstImpl) Burst() int { + return c.burstFn(c.namespaceName) +} + func NewRequestToRateLimiter( - rateFn quotas.RateFn, + rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { mapping := make(map[string]quotas.RequestRateLimiter) - executionRateLimiter := NewExecutionPriorityRateLimiter(rateFn) - visibilityRateLimiter := NewVisibilityPriorityRateLimiter(rateFn) - otherRateLimiter := NewOtherAPIPriorityRateLimiter(rateFn) + executionRateLimiter := NewExecutionPriorityRateLimiter(rateBurstFn) + visibilityRateLimiter := NewVisibilityPriorityRateLimiter(rateBurstFn) + otherRateLimiter := NewOtherAPIPriorityRateLimiter(rateBurstFn) for api := range ExecutionAPIToPriority { mapping[api] = executionRateLimiter @@ -128,31 +161,31 @@ func NewRequestToRateLimiter( } func NewExecutionPriorityRateLimiter( - rateFn quotas.RateFn, + rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RateLimiter) for priority := range ExecutionAPIPriorities { - rateLimiters[priority] = quotas.NewDefaultIncomingDynamicRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute) } return quotas.NewPriorityRateLimiter(ExecutionAPIToPriority, rateLimiters) } func NewVisibilityPriorityRateLimiter( - rateFn quotas.RateFn, + rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RateLimiter) for priority := range VisibilityAPIPriorities { - rateLimiters[priority] = quotas.NewDefaultIncomingDynamicRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute) } return quotas.NewPriorityRateLimiter(VisibilityAPIToPriority, rateLimiters) } func NewOtherAPIPriorityRateLimiter( - rateFn quotas.RateFn, + rateBurstFn quotas.RateBurst, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RateLimiter) for priority := range OtherAPIPriorities { - rateLimiters[priority] = quotas.NewDefaultIncomingDynamicRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute) } return quotas.NewPriorityRateLimiter(OtherAPIToPriority, rateLimiters) } diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 8c4c163743b..f5d869d53f6 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -171,7 +171,11 @@ func RateLimitInterceptorProvider( serviceConfig *Config, ) *interceptor.RateLimitInterceptor { return interceptor.NewRateLimitInterceptor( - configs.NewRequestToRateLimiter(func() float64 { return float64(serviceConfig.RPS()) }), + configs.NewRequestToRateLimiter( + quotas.NewDefaultIncomingRateLimiter( + func() float64 { return float64(serviceConfig.RPS()) }, + ), + ), map[string]int{}, ) } @@ -184,13 +188,17 @@ func NamespaceRateLimitInterceptorProvider( serviceResource.GetNamespaceRegistry(), quotas.NewNamespaceRateLimiter( func(req quotas.Request) quotas.RequestRateLimiter { - return configs.NewRequestToRateLimiter(func() float64 { - return namespaceRPS( - serviceConfig, - serviceResource.GetFrontendServiceResolver(), - req.Caller, - ) - }) + return configs.NewRequestToRateLimiter(configs.NewNamespaceRateBurst( + req.Caller, + func(namespace string) float64 { + return namespaceRPS( + serviceConfig, + serviceResource.GetFrontendServiceResolver(), + namespace, + ) + }, + serviceConfig.MaxNamespaceBurstPerInstance, + )) }, ), map[string]int{}, diff --git a/service/frontend/service.go b/service/frontend/service.go index 55583e6b021..94d7058908e 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -64,6 +64,7 @@ type Config struct { HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter RPS dynamicconfig.IntPropertyFn MaxNamespaceRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter + MaxNamespaceBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter MaxNamespaceCountPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter GlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter MaxIDLengthLimit dynamicconfig.IntPropertyFn @@ -143,6 +144,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 2400), MaxNamespaceRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceRPSPerInstance, 2400), + MaxNamespaceBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceBurstPerInstance, 4800), MaxNamespaceCountPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceCountPerInstance, 1200), GlobalNamespaceRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendGlobalNamespaceRPS, 0), MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), diff --git a/service/history/configs/quotas.go b/service/history/configs/quotas.go index beba4013e1e..38d769a3afa 100644 --- a/service/history/configs/quotas.go +++ b/service/history/configs/quotas.go @@ -79,7 +79,7 @@ func NewPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RateLimiter) for priority := range APIPriorities { - rateLimiters[priority] = quotas.NewDefaultIncomingDynamicRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewDefaultIncomingRateLimiter(rateFn) } return quotas.NewPriorityRateLimiter(APIToPriority, rateLimiters) } diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index d51e014d62c..1248831363b 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -123,7 +123,7 @@ func newQueueProcessorBase( timeSource: shard.GetTimeSource(), options: options, processor: processor, - rateLimiter: quotas.NewDefaultOutgoingDynamicRateLimiter( + rateLimiter: quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(options.MaxPollRPS()) }, ), status: common.DaemonStatusInitialized, diff --git a/service/history/replicationTaskFetcher.go b/service/history/replicationTaskFetcher.go index d209f81490f..ce09947dcb3 100644 --- a/service/history/replicationTaskFetcher.go +++ b/service/history/replicationTaskFetcher.go @@ -187,7 +187,7 @@ func newReplicationTaskFetcher( numWorker := config.ReplicationTaskFetcherParallelism() requestChan := make(chan *replicationTaskRequest, requestChanBufferSize) shutdownChan := make(chan struct{}) - rateLimiter := quotas.NewDefaultOutgoingDynamicRateLimiter( + rateLimiter := quotas.NewDefaultOutgoingRateLimiter( func() float64 { return config.ReplicationTaskProcessorHostQPS() }, ) diff --git a/service/history/replicationTaskProcessor.go b/service/history/replicationTaskProcessor.go index 0732c351da1..3c5f94b1d81 100644 --- a/service/history/replicationTaskProcessor.go +++ b/service/history/replicationTaskProcessor.go @@ -141,7 +141,7 @@ func NewReplicationTaskProcessor( logger: shard.GetLogger(), replicationTaskExecutor: replicationTaskExecutor, rateLimiter: quotas.NewMultiRateLimiter([]quotas.RateLimiter{ - quotas.NewDefaultOutgoingDynamicRateLimiter( + quotas.NewDefaultOutgoingRateLimiter( func() float64 { return config.ReplicationTaskProcessorShardQPS() }, ), replicationTaskFetcher.GetRateLimiter(), diff --git a/service/history/replicationTaskProcessor_test.go b/service/history/replicationTaskProcessor_test.go index b37a0351496..6b5e00e00c6 100644 --- a/service/history/replicationTaskProcessor_test.go +++ b/service/history/replicationTaskProcessor_test.go @@ -131,7 +131,7 @@ func (s *replicationTaskProcessorSuite) SetupTest() { s.mockReplicationTaskExecutor = NewMockreplicationTaskExecutor(s.controller) s.mockHistoryClient = historyservicemock.NewMockHistoryServiceClient(s.controller) s.mockReplicationTaskFetcher = NewMockReplicationTaskFetcher(s.controller) - rateLimiter := quotas.NewDefaultOutgoingDynamicRateLimiter( + rateLimiter := quotas.NewDefaultOutgoingRateLimiter( func() float64 { return 100 }, ) s.mockReplicationTaskFetcher.EXPECT().GetSourceCluster().Return(cluster.TestAlternativeClusterName).AnyTimes() diff --git a/service/history/taskPriorityAssigner.go b/service/history/taskPriorityAssigner.go index 6e56270c5d6..60c11a1dcdc 100644 --- a/service/history/taskPriorityAssigner.go +++ b/service/history/taskPriorityAssigner.go @@ -144,7 +144,7 @@ func (a *taskPriorityAssignerImpl) getRateLimiter( } a.RUnlock() - limiter := quotas.NewDefaultOutgoingDynamicRateLimiter( + limiter := quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(a.config.TaskProcessRPS(namespace)) }, ) diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index 6e2d2012a62..5d92e547404 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -142,7 +142,7 @@ func newTimerQueueProcessorBase( queueTaskProcessor: queueTaskProcessor, redispatchQueue: redispatchQueue, queueTaskInitializer: queueTaskInitializer, - rateLimiter: quotas.NewDefaultOutgoingDynamicRateLimiter( + rateLimiter: quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(maxPollRPS()) }, ), retryPolicy: common.CreatePersistanceRetryPolicy(), diff --git a/service/matching/configs/quotas.go b/service/matching/configs/quotas.go index 2fe17a13445..4f5f5f23aac 100644 --- a/service/matching/configs/quotas.go +++ b/service/matching/configs/quotas.go @@ -51,7 +51,7 @@ func NewPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RateLimiter) for priority := range APIPriorities { - rateLimiters[priority] = quotas.NewDefaultIncomingDynamicRateLimiter(rateFn) + rateLimiters[priority] = quotas.NewDefaultIncomingRateLimiter(rateFn) } return quotas.NewPriorityRateLimiter(APIToPriority, rateLimiters) } diff --git a/service/matching/forwarder.go b/service/matching/forwarder.go index 4261591bd0d..93c29f16d6d 100644 --- a/service/matching/forwarder.go +++ b/service/matching/forwarder.go @@ -105,7 +105,7 @@ func newForwarder( taskQueueKind: kind, outstandingTasksLimit: int32(cfg.ForwarderMaxOutstandingTasks()), outstandingPollsLimit: int32(cfg.ForwarderMaxOutstandingPolls()), - limiter: quotas.NewDefaultOutgoingDynamicRateLimiter( + limiter: quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(cfg.ForwarderMaxRatePerSecond()) }, ), } diff --git a/service/matching/matcher.go b/service/matching/matcher.go index db482f8965a..4ed0138cd2f 100644 --- a/service/matching/matcher.go +++ b/service/matching/matcher.go @@ -49,10 +49,8 @@ type TaskMatcher struct { // not active in a cluster queryTaskC chan *internalTask - // dynamicRate is the dynamic rate for rate limiter - dynamicRate quotas.DynamicRate - // dynamicBurst is the dynamic burst for rate limiter - dynamicBurst quotas.DynamicBurst + // dynamicRate is the dynamic rate & burst for rate limiter + dynamicRateBurst quotas.MutableRateBurst // rateLimiter that limits the rate at which tasks can be dispatched to consumers rateLimiter quotas.RateLimiter @@ -70,31 +68,31 @@ const ( // used by task producers and consumers to find a match. Both sync matches and non-sync // matches should use this implementation func newTaskMatcher(config *taskQueueConfig, fwdr *Forwarder, scopeFunc func() metrics.Scope) *TaskMatcher { - dynamicRate := quotas.NewDynamicRate(defaultTaskDispatchRPS) - dynamicBurst := quotas.NewDynamicBurst(int(defaultTaskDispatchRPS)) + dynamicRateBurst := quotas.NewMutableRateBurst( + defaultTaskDispatchRPS, + int(defaultTaskDispatchRPS), + ) limiter := quotas.NewMultiRateLimiter([]quotas.RateLimiter{ quotas.NewDynamicRateLimiter( - dynamicRate.RateFn(), - dynamicBurst.BurstFn(), + dynamicRateBurst, defaultTaskDispatchRPSTTL, ), - quotas.NewDefaultOutgoingDynamicRateLimiter( + quotas.NewDefaultOutgoingRateLimiter( config.AdminNamespaceTaskQueueToPartitionDispatchRate, ), - quotas.NewDefaultOutgoingDynamicRateLimiter( + quotas.NewDefaultOutgoingRateLimiter( config.AdminNamespaceToPartitionDispatchRate, ), }) return &TaskMatcher{ - config: config, - dynamicRate: dynamicRate, - dynamicBurst: dynamicBurst, - rateLimiter: limiter, - scope: scopeFunc, - fwdr: fwdr, - taskC: make(chan *internalTask), - queryTaskC: make(chan *internalTask), - numPartitions: config.NumReadPartitions, + config: config, + dynamicRateBurst: dynamicRateBurst, + rateLimiter: limiter, + scope: scopeFunc, + fwdr: fwdr, + taskC: make(chan *internalTask), + queryTaskC: make(chan *internalTask), + numPartitions: config.NumReadPartitions, } } @@ -312,8 +310,8 @@ func (tm *TaskMatcher) UpdateRatelimit(rps *float64) { burst = minTaskThrottlingBurstSize } - tm.dynamicRate.Store(rate) - tm.dynamicBurst.Store(burst) + tm.dynamicRateBurst.SetRate(rate) + tm.dynamicRateBurst.SetBurst(burst) } // Rate returns the current rate at which tasks are dispatched diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index e67db7a4605..78fa9c99c37 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -694,12 +694,11 @@ func (s *matchingEngineSuite) TestSyncMatchActivities() { defaultTaskDispatchRPS, defaultTaskDispatchRPS, ) - mgrImpl.matcher.dynamicRate = &dynamicRateWrapper{ - DynamicRate: quotas.NewDynamicRate(defaultTaskDispatchRPS), - RateLimiterImpl: mgrImpl.matcher.rateLimiter.(*quotas.RateLimiterImpl), - } - mgrImpl.matcher.dynamicBurst = &dynamicBurstWrapper{ - DynamicBurst: quotas.NewDynamicBurst(defaultTaskDispatchRPS), + mgrImpl.matcher.dynamicRateBurst = &dynamicRateBurstWrapper{ + MutableRateBurst: quotas.NewMutableRateBurst( + defaultTaskDispatchRPS, + defaultTaskDispatchRPS, + ), RateLimiterImpl: mgrImpl.matcher.rateLimiter.(*quotas.RateLimiterImpl), } s.matchingEngine.updateTaskQueue(tlID, mgr) @@ -906,12 +905,11 @@ func (s *matchingEngineSuite) concurrentPublishConsumeActivities( defaultTaskDispatchRPS, defaultTaskDispatchRPS, ) - mgrImpl.matcher.dynamicRate = &dynamicRateWrapper{ - DynamicRate: quotas.NewDynamicRate(defaultTaskDispatchRPS), - RateLimiterImpl: mgrImpl.matcher.rateLimiter.(*quotas.RateLimiterImpl), - } - mgrImpl.matcher.dynamicBurst = &dynamicBurstWrapper{ - DynamicBurst: quotas.NewDynamicBurst(defaultTaskDispatchRPS), + mgrImpl.matcher.dynamicRateBurst = &dynamicRateBurstWrapper{ + MutableRateBurst: quotas.NewMutableRateBurst( + defaultTaskDispatchRPS, + defaultTaskDispatchRPS, + ), RateLimiterImpl: mgrImpl.matcher.rateLimiter.(*quotas.RateLimiterImpl), } s.matchingEngine.updateTaskQueue(tlID, mgr) @@ -2077,22 +2075,26 @@ func defaultTestConfig() *Config { } type ( - dynamicRateWrapper struct { - quotas.DynamicRate - *quotas.RateLimiterImpl - } - dynamicBurstWrapper struct { - quotas.DynamicBurst + dynamicRateBurstWrapper struct { + quotas.MutableRateBurst *quotas.RateLimiterImpl } ) -func (d *dynamicRateWrapper) Store(rate float64) { - d.DynamicRate.Store(rate) +func (d *dynamicRateBurstWrapper) SetRate(rate float64) { + d.MutableRateBurst.SetRate(rate) d.RateLimiterImpl.SetRate(rate) } -func (d *dynamicBurstWrapper) Store(burst int) { - d.DynamicBurst.Store(burst) +func (d *dynamicRateBurstWrapper) SetBurst(burst int) { + d.MutableRateBurst.SetBurst(burst) d.RateLimiterImpl.SetBurst(burst) } + +func (d *dynamicRateBurstWrapper) Rate() float64 { + return d.RateLimiterImpl.Rate() +} + +func (d *dynamicRateBurstWrapper) Burst() int { + return d.RateLimiterImpl.Burst() +} diff --git a/service/worker/archiver/client.go b/service/worker/archiver/client.go index 2ca9284e5f2..1e6e8d08b53 100644 --- a/service/worker/archiver/client.go +++ b/service/worker/archiver/client.go @@ -136,7 +136,7 @@ func NewClient( logger: logger, temporalClient: publicClient, numWorkflows: numWorkflows, - rateLimiter: quotas.NewDefaultOutgoingDynamicRateLimiter( + rateLimiter: quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(requestRPS()) }, ), archiverProvider: archiverProvider, diff --git a/service/worker/scanner/executions/scavenger.go b/service/worker/scanner/executions/scavenger.go index 06973b57861..a7193cab089 100644 --- a/service/worker/scanner/executions/scavenger.go +++ b/service/worker/scanner/executions/scavenger.go @@ -87,7 +87,7 @@ func NewScavenger( metricsClient, metrics.ExecutionsScavengerScope, ), - rateLimiter: quotas.NewDefaultOutgoingDynamicRateLimiter( + rateLimiter: quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(rateOverall) }, ), metrics: metricsClient, @@ -151,7 +151,7 @@ func (s *Scavenger) run() { s.logger, s, quotas.NewMultiRateLimiter([]quotas.RateLimiter{ - quotas.NewDefaultOutgoingDynamicRateLimiter( + quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(ratePerShard) }, ), s.rateLimiter, diff --git a/service/worker/scanner/history/scavenger.go b/service/worker/scanner/history/scavenger.go index eca042bdf53..972164da6f6 100644 --- a/service/worker/scanner/history/scavenger.go +++ b/service/worker/scanner/history/scavenger.go @@ -113,7 +113,7 @@ func NewScavenger( numShards: numShards, db: db, client: client, - rateLimiter: quotas.NewDefaultOutgoingDynamicRateLimiter( + rateLimiter: quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(rps) }, ), metrics: metricsClient, diff --git a/tools/cli/adminDBScanCommand.go b/tools/cli/adminDBScanCommand.go index 32839497d0e..7bb1aaac4c6 100644 --- a/tools/cli/adminDBScanCommand.go +++ b/tools/cli/adminDBScanCommand.go @@ -899,7 +899,7 @@ func getRateLimiter(startRPS int, targetRPS int) quotas.RateLimiter { if startRPS >= targetRPS { ErrorAndExit("startRPS is greater than target RPS", nil) } - return quotas.NewDefaultOutgoingDynamicRateLimiter( + return quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(targetRPS) }, ) } diff --git a/tools/cli/adminElasticSearchCommands.go b/tools/cli/adminElasticSearchCommands.go index df7f511003e..0c8d4f6f4a5 100644 --- a/tools/cli/adminElasticSearchCommands.go +++ b/tools/cli/adminElasticSearchCommands.go @@ -167,7 +167,7 @@ func AdminDelete(c *cli.Context) { inputFileName := getRequiredOption(c, FlagInputFile) batchSize := c.Int(FlagBatchSize) rps := c.Int(FlagRPS) - ratelimiter := quotas.NewDefaultOutgoingDynamicRateLimiter( + ratelimiter := quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(rps) }, )