Skip to content

Commit

Permalink
aliyun cms sdk QPS限流
Browse files Browse the repository at this point in the history
  • Loading branch information
shuoshadow committed Sep 1, 2021
1 parent 0738e2c commit b2e3ed1
Showing 1 changed file with 73 additions and 10 deletions.
83 changes: 73 additions & 10 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package collector

import (
"context"
"fmt"
"net/http"
"sync"
Expand All @@ -24,6 +25,12 @@ type cloudMonitor struct {
scrapeDurationDesc *prometheus.Desc

lock sync.Mutex

reqCount int
maxCount int
interval time.Duration
reqLock sync.Mutex
reqChan chan bool
}

// NewCloudMonitorCollector create a new collector for cloud monitor
Expand All @@ -46,6 +53,10 @@ func NewCloudMonitorCollector(appName string, cfg *config.Config, rt http.RoundT
[]string{"namespace", "collector"},
nil,
),
maxCount: 50,
interval: 1*time.Second,
reqChan: make(chan bool, 1),
reqCount: 0,
}, nil
}

Expand All @@ -57,22 +68,74 @@ func (m *cloudMonitor) Collect(ch chan<- prometheus.Metric) {
m.lock.Lock()
defer m.lock.Unlock()

subCtx, cancel := context.WithCancel(context.Background())
// 启动定时器,每秒重置reqCount
go m.resetReqCount(subCtx)

wg := &sync.WaitGroup{}
// do collect
for sub, metrics := range m.cfg.Metrics {
for i := range metrics {
wg.Add(1)
go func(namespace string, metric *config.Metric) {
defer wg.Done()
start := time.Now()
m.client.Collect(m.namespace, namespace, metric, ch)
ch <- prometheus.MustNewConstMetric(
m.scrapeDurationDesc,
prometheus.GaugeValue,
time.Now().Sub(start).Seconds(),
namespace, metric.String())
}(sub, metrics[i])
m.reqIsAvailable()
select {
case <- m.reqChan:
// reqCount + 1
m.reqIncrease()

go func(namespace string, metric *config.Metric) {
defer wg.Done()
start := time.Now()
m.client.Collect(m.namespace, namespace, metric, ch)
ch <- prometheus.MustNewConstMetric(
m.scrapeDurationDesc,
prometheus.GaugeValue,
time.Now().Sub(start).Seconds(),
namespace, metric.String())
}(sub, metrics[i])
}
}
}
// 每一次请求完成后都要退出计时器
cancel()
wg.Wait()
}

// 请求计数增加
func (m *cloudMonitor) reqIncrease() {
m.reqLock.Lock()
defer m.reqLock.Unlock()

m.reqCount += 1
}

// 是否可以继续请求
func (m *cloudMonitor) reqIsAvailable() {
m.reqLock.Lock()
defer m.reqLock.Unlock()

if m.reqCount < m.maxCount {
// 因为计时器也会往channel中写入数据,所以需要判断下, 以免阻塞在此
if len(m.reqChan) == 0 {
m.reqChan <- true
}
}
}

func (m *cloudMonitor) resetReqCount(ctx context.Context) {

ticker := time.NewTicker(m.interval)
for {
select {
case <- ticker.C:
m.reqLock.Lock()
m.reqCount = 0
m.reqLock.Unlock()

// 定时器写入是因为reqCount >= maxCount后,reqIsAvailable不再写入true到channel,避免select阻塞
m.reqChan <- true
case <- ctx.Done():
return
}
}
}

0 comments on commit b2e3ed1

Please # to comment.