-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathperiodically.go
156 lines (136 loc) · 2.88 KB
/
periodically.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"time"
)
const defaultCtlBuf = 16
type periodicRequest struct {
k <-chan bool
f func(time.Time) bool
}
type pFuncs map[<-chan bool]func(time.Time) bool
type periodically struct {
funcs pFuncs
ctl chan periodicRequest
ticker tickSrc
sem chan bool
running chan bool
}
type tickSrc interface {
C() <-chan time.Time
Stop() // hammertime
}
type realTicker struct {
t *time.Ticker
}
func (t realTicker) C() <-chan time.Time {
return t.t.C
}
func (t realTicker) Stop() {
t.t.Stop()
}
func newPeriodically(period time.Duration, workers int) *periodically {
if period == 0 {
return nil
}
return newPeriodicallyInt(realTicker{time.NewTicker(period)},
defaultCtlBuf, workers)
}
// When you want to supply your own time source.
func newPeriodicallyInt(ticker tickSrc, ctlbuf, workers int) *periodically {
if workers < 1 {
return nil
}
rv := &periodically{
funcs: make(pFuncs),
ctl: make(chan periodicRequest, ctlbuf),
ticker: ticker,
sem: make(chan bool, workers),
running: make(chan bool),
}
go rv.service()
return rv
}
func (p *periodically) service() {
defer close(p.running)
defer p.ticker.Stop()
var workFinished <-chan time.Time
tick := p.ticker.C()
additions := pFuncs{}
removals := []<-chan bool{}
for {
select {
case t := <-tick:
workFinished = p.doWork(t, additions, removals)
additions = pFuncs{}
removals = removals[:0]
tick = nil
case <-workFinished:
tick = p.ticker.C()
workFinished = nil
case req := <-p.ctl:
switch {
case req.k == nil && req.f == nil:
return
case req.f == nil:
removals = append(removals, req.k)
default:
additions[req.k] = req.f
}
}
}
}
func (p *periodically) runTask(t time.Time,
ch <-chan bool, f func(time.Time) bool) chan bool {
rv := make(chan bool, 1)
p.sem <- true
go func() {
defer func() { <-p.sem }()
rv <- f(t)
}()
return rv
}
func (p *periodically) doWork(t time.Time,
additions pFuncs, removals []<-chan bool) <-chan time.Time {
rv := make(chan time.Time)
// first, fix up the pfuncs
for _, ch := range removals {
delete(additions, ch)
delete(p.funcs, ch)
}
for ch, f := range additions {
p.funcs[ch] = f
}
go func() {
results := map[<-chan bool]chan bool{}
for ch, f := range p.funcs {
select {
case <-ch:
// This one signaled it's gone now
delete(p.funcs, ch)
default:
results[ch] = p.runTask(t, ch, f)
}
}
// Harvest the results and verify they want to keep tickin'
for kch, rvch := range results {
if !<-rvch {
delete(p.funcs, kch)
}
}
rv <- t
}()
return rv
}
func (p *periodically) Stop() {
select {
case p.ctl <- periodicRequest{}:
case <-p.running:
// already stopped
}
}
func (p *periodically) Register(k <-chan bool, f func(time.Time) bool) {
p.ctl <- periodicRequest{k, f}
}
func (p *periodically) Unregister(k <-chan bool) {
p.ctl <- periodicRequest{k, nil}
}