forked from xiaojiaoyu100/aliyun-acm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobserver.go
82 lines (70 loc) · 1.99 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package aliacm
import (
"sync"
)
// AfterUpdateHook 配置更新完毕后的回调函数
type AfterUpdateHook func(Config)
// ObserverRegisterInfo 用来添加观察者需要关心的配置
type ObserverRegisterInfo struct {
AfterUpdateHook AfterUpdateHook
Info Info
}
// Observer observes the config change.
type Observer struct {
notifyMap map[Info]AfterUpdateHook
isFirstUpdateMap map[Info]bool
updateLockMap map[Info]sync.RWMutex
readLockMap map[Info]sync.WaitGroup
}
// GenerateObserver 用来创建新的Observer
func GenerateObserver(oriList ...ObserverRegisterInfo) *Observer {
var obs Observer
obs.isFirstUpdateMap = make(map[Info]bool)
obs.readLockMap = make(map[Info]sync.WaitGroup)
obs.notifyMap = make(map[Info]AfterUpdateHook)
obs.updateLockMap = make(map[Info]sync.RWMutex)
for _, ori := range oriList {
obs.isFirstUpdateMap[ori.Info] = true
obs.readLockMap[ori.Info] = sync.WaitGroup{}
obs.readLockMap[ori.Info].Add(1)
obs.notifyMap[ori.Info] = ori.AfterUpdateHook
obs.updateLockMap[ori.Info] = sync.RWMutex{}
}
return &obs
}
// Infos 获取Observer所有的Info
func (o *Observer) Infos() []Info {
var infos []Info
for info := range o.notifyMap {
infos = append(infos, info)
}
return infos
}
// OnUpdate ACM配置更新后的回调函数
func (o *Observer) OnUpdate(config Config) {
if _, ok := o.updateLockMap[config.Info]; !ok {
return
}
o.updateLockMap[config.Info].Lock()
defer o.updateLockMap[config.Info].Unlock()
if o.isFirstUpdateMap[config.Info] {
o.isFirstUpdateMap[config.Info] = false
} else {
o.readLockMap[config.Info].Add(1)
}
defer o.readLockMap[config.Info].Done()
o.notifyMap[config.Info](config)
}
// Wait 用于等待所有AfterUpdateHook调用完
func (o *Observer) Wait() {
for _, lock := range o.readLockMap {
lock.Wait()
}
}
// SingleWait 用于等待单个AfterUpdateHook调用完
func (o *Observer) SingleWait(info Info) {
if _, ok := o.readLockMap[info]; !ok {
return
}
o.readLockMap[info].Wait()
}