Skip to content

Commit

Permalink
Add per namespace burst limit control (#2067)
Browse files Browse the repository at this point in the history
* Add per namespace burst limit control
* Default per namespace burst limit to be 4800, 2x of rate limit
  • Loading branch information
wxing1292 authored Oct 27, 2021
1 parent ede54d9 commit af181b4
Show file tree
Hide file tree
Showing 27 changed files with 214 additions and 281 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ var Keys = map[Key]string{
FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize",
FrontendRPS: "frontend.rps",
FrontendMaxNamespaceRPSPerInstance: "frontend.namespaceRPS",
FrontendMaxNamespaceBurstPerInstance: "frontend.namespaceBurst",
FrontendMaxNamespaceCountPerInstance: "frontend.namespaceCount",
FrontendGlobalNamespaceRPS: "frontend.globalNamespacerps",
FrontendShutdownDrainDuration: "frontend.shutdownDrainDuration",
Expand Down Expand Up @@ -439,6 +440,8 @@ const (
FrontendRPS
// FrontendMaxNamespaceRPSPerInstance is workflow namespace rate limit per second
FrontendMaxNamespaceRPSPerInstance
// FrontendMaxNamespaceBurstPerInstance is workflow namespace burst limit
FrontendMaxNamespaceBurstPerInstance
// FrontendMaxNamespaceCountPerInstance is workflow namespace count limit per second
FrontendMaxNamespaceCountPerInstance
// FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster
Expand Down
2 changes: 1 addition & 1 deletion common/log/throttle_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewThrottledLogger(logger Logger, rps quotas.RateFn) *throttledLogger {
logger = sl.Skip(extraSkipForThrottleLogger)
}

limiter := quotas.NewDefaultOutgoingDynamicRateLimiter(rps)
limiter := quotas.NewDefaultOutgoingRateLimiter(rps)
tl := &throttledLogger{
limiter: limiter,
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func buildRateLimiters(
result := make(map[string]quotas.RateLimiter, len(cfg.DataStores))
for dsName := range cfg.DataStores {
if maxQPS != nil && maxQPS() > 0 {
result[dsName] = quotas.NewDefaultOutgoingDynamicRateLimiter(
result[dsName] = quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(maxQPS()) },
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func NewVisibilityManagerRateLimited(
readMaxQPS dynamicconfig.IntPropertyFn,
writeMaxQPS dynamicconfig.IntPropertyFn,
) *visibilityManagerRateLimited {
readRateLimiter := quotas.NewDefaultOutgoingDynamicRateLimiter(
readRateLimiter := quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(readMaxQPS()) },
)
writeRateLimiter := quotas.NewDefaultOutgoingDynamicRateLimiter(
writeRateLimiter := quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(writeMaxQPS()) },
)
return &visibilityManagerRateLimited{
Expand Down
6 changes: 4 additions & 2 deletions common/quotas/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func BenchmarkRateLimiter(b *testing.B) {

func BenchmarkDynamicRateLimiter(b *testing.B) {
limiter := NewDynamicRateLimiter(
func() float64 { return testRate },
func() int { return testBurst },
NewRateBurst(
func() float64 { return testRate },
func() int { return testBurst },
),
time.Minute,
)
for n := 0; n < b.N; n++ {
Expand Down
106 changes: 73 additions & 33 deletions common/quotas/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,66 +32,106 @@ type (
// RateFn returns a float64 as the RPS
RateFn func() float64

// BurstFn returns a int as the RPS
// BurstFn returns an int as the burst / bucket size
BurstFn func() int

// DynamicRateImpl stores the dynamic rate per second for rate limiter
DynamicRateImpl struct {
rate *atomic.Float64
// RateBurst returns rate & burst for rate limiter
RateBurst interface {
Rate() float64
Burst() int
}

// DynamicBurstImpl stores the dynamic burst for rate limiter
DynamicBurstImpl struct {
burst *atomic.Int64
RateBurstImpl struct {
rateFn RateFn
burstFn BurstFn
}

DynamicRate interface {
Load() float64
Store(rate float64)
RateFn() RateFn
// MutableRateBurstImpl stores the dynamic rate & burst for rate limiter
MutableRateBurstImpl struct {
rate *atomic.Float64
burst *atomic.Int64
}

DynamicBurst interface {
Load() int
Store(burst int)
BurstFn() BurstFn
MutableRateBurst interface {
SetRate(rate float64)
SetBurst(burst int)
RateBurst
}
)

func NewDynamicRate(rate float64) *DynamicRateImpl {

return &DynamicRateImpl{
rate: atomic.NewFloat64(rate),
func NewRateBurst(
rateFn RateFn,
burstFn BurstFn,
) *RateBurstImpl {
return &RateBurstImpl{
rateFn: rateFn,
burstFn: burstFn,
}
}

func NewDynamicBurst(burst int) *DynamicBurstImpl {
func NewDefaultIncomingRateBurst(
rateFn RateFn,
) *RateBurstImpl {
return newDefaultRateBurst(rateFn, defaultIncomingRateBurstRatio)
}

return &DynamicBurstImpl{
burst: atomic.NewInt64(int64(burst)),
func NewDefaultOutgoingRateBurst(
rateFn RateFn,
) *RateBurstImpl {
return newDefaultRateBurst(rateFn, defaultOutgoingRateBurstRatio)
}

func newDefaultRateBurst(
rateFn RateFn,
rateToBurstRatio float64,
) *RateBurstImpl {
burstFn := func() int {
rate := rateFn()
if rate < 0 {
rate = 0
}
if rateToBurstRatio < 0 {
rateToBurstRatio = 0
}
burst := int(rate * rateToBurstRatio)
if burst == 0 && rate > 0 && rateToBurstRatio > 0 {
burst = 1
}
return burst
}
return NewRateBurst(rateFn, burstFn)
}

func (d *DynamicRateImpl) Load() float64 {
return d.rate.Load()
func (d *RateBurstImpl) Rate() float64 {
return d.rateFn()
}

func (d *DynamicRateImpl) Store(rate float64) {
d.rate.Store(rate)
func (d *RateBurstImpl) Burst() int {
return d.burstFn()
}

func (d *DynamicRateImpl) RateFn() RateFn {
return d.Load
func NewMutableRateBurst(
rate float64,
burst int,
) *MutableRateBurstImpl {
return &MutableRateBurstImpl{
rate: atomic.NewFloat64(rate),
burst: atomic.NewInt64(int64(burst)),
}
}

func (d *DynamicBurstImpl) Load() int {
return int(d.burst.Load())
func (d *MutableRateBurstImpl) SetRate(rate float64) {
d.rate.Store(rate)
}

func (d *DynamicBurstImpl) Store(burst int) {
func (d *MutableRateBurstImpl) SetBurst(burst int) {
d.burst.Store(int64(burst))
}

func (d *DynamicBurstImpl) BurstFn() BurstFn {
return d.Load
func (d *MutableRateBurstImpl) Rate() float64 {
return d.rate.Load()
}

func (d *MutableRateBurstImpl) Burst() int {
return int(d.burst.Load())
}
27 changes: 11 additions & 16 deletions common/quotas/dynamic_rate_limiter_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ const (
type (
// DynamicRateLimiterImpl implements a dynamic config wrapper around the rate limiter
DynamicRateLimiterImpl struct {
rateFn RateFn
burstFn BurstFn
rateBurstFn RateBurst
refreshInterval time.Duration

refreshTimer *time.Timer
Expand All @@ -51,41 +50,37 @@ var _ RateLimiter = (*DynamicRateLimiterImpl)(nil)

// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
func NewDynamicRateLimiter(
rateFn RateFn,
burstFn BurstFn,
rateBurstFn RateBurst,
refreshInterval time.Duration,
) *DynamicRateLimiterImpl {
rateLimiter := &DynamicRateLimiterImpl{
rateFn: rateFn,
burstFn: burstFn,
rateBurstFn: rateBurstFn,
refreshInterval: refreshInterval,

refreshTimer: time.NewTimer(refreshInterval),
rateLimiter: NewRateLimiter(rateFn(), burstFn()),
rateLimiter: NewRateLimiter(rateBurstFn.Rate(), rateBurstFn.Burst()),
}
return rateLimiter
}

// NewDefaultIncomingDynamicRateLimiter returns a default rate limiter
// NewDefaultIncomingRateLimiter returns a default rate limiter
// for incoming traffic
func NewDefaultIncomingDynamicRateLimiter(
func NewDefaultIncomingRateLimiter(
rateFn RateFn,
) *DynamicRateLimiterImpl {
return NewDynamicRateLimiter(
rateFn,
func() int { return int(defaultIncomingRateBurstRatio * rateFn()) },
NewDefaultIncomingRateBurst(rateFn),
defaultRefreshInterval,
)
}

// NewDefaultOutgoingDynamicRateLimiter returns a default rate limiter
// NewDefaultOutgoingRateLimiter returns a default rate limiter
// for outgoing traffic
func NewDefaultOutgoingDynamicRateLimiter(
func NewDefaultOutgoingRateLimiter(
rateFn RateFn,
) *DynamicRateLimiterImpl {
return NewDynamicRateLimiter(
rateFn,
func() int { return int(defaultOutgoingRateBurstRatio * rateFn()) },
NewDefaultOutgoingRateBurst(rateFn),
defaultRefreshInterval,
)
}
Expand Down Expand Up @@ -142,7 +137,7 @@ func (d *DynamicRateLimiterImpl) maybeRefresh() {
select {
case <-d.refreshTimer.C:
d.refreshTimer.Reset(d.refreshInterval)
d.rateLimiter.SetRateBurst(d.rateFn(), d.burstFn())
d.rateLimiter.SetRateBurst(d.rateBurstFn.Rate(), d.rateBurstFn.Burst())

default:
// noop
Expand Down
Loading

0 comments on commit af181b4

Please # to comment.