-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprioritychannel.go
154 lines (136 loc) · 3.6 KB
/
prioritychannel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package heap
import "context"
// PriorityChannel greedily reads values from inCh and returns a channel that
// returns the same values prioritized according to lessFunc, with lesser values
// returned first.
//
// When inCh is closed, all remaining values are written to the returned
// channel, and then the returned channel is closed.
//
// If ctx is canceled then the returned channel is closed immediately.
func PriorityChannel[T any](ctx context.Context, inCh <-chan T, lessFunc func(T, T) bool) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
heap := NewHeap(lessFunc)
var valueToSend T
valueToSendValid := false
for {
// If we do not already have a value to send, get one. If the heap
// is empty then read one from inCh, otherwise chose the highest
// priority value from heap.
if !valueToSendValid {
var ok bool
valueToSend, ok = heap.Pop()
if !ok {
select {
case <-ctx.Done():
return
case valueToSend, ok = <-inCh:
if !ok {
// inCh was closed so we are done.
return
}
}
}
valueToSendValid = true //nolint:wastedassign
}
// Either write valueToSend to outCh or read a new value from inCh
// and update valueToSend.
select {
case <-ctx.Done():
return
case outCh <- valueToSend:
// As valueToSend was sent, we need a new one.
valueToSendValid = false
case value, ok := <-inCh:
// As valueToSend was not sent, push it back onto the heap.
heap.Push(valueToSend)
// If inCh was closed then send the remaining values to outCh
// and return.
if !ok {
for value := range heap.PopAll() {
select {
case <-ctx.Done():
return
case outCh <- value:
}
}
return
}
// Otherwise, add value to the heap and get the new value to
// send.
valueToSend = heap.PushPop(value)
valueToSendValid = true
}
}
}()
return outCh
}
// BufferedPriorityChannel reads values from inCh and returns a channel that
// returns the same values prioritized according to lessFunc, with lesser values
// returned first.
//
// It maintains a buffer of size size, reading from inCh until the buffer is
// full, and then returning the values in priority over the returned channel,
// and reading more values from inCh when required. When inCh is closed, all
// remaining values are written to the returned channel, and then the returned
// channel is closed.
//
// If ctx is canceled then the returned channel is closed immediately.
func BufferedPriorityChannel[T any](ctx context.Context, inCh <-chan T, size int, lessFunc func(T, T) bool) <-chan T {
if size <= 0 {
panic("size out of range")
}
outCh := make(chan T)
go func() {
defer close(outCh)
heap := NewHeap(lessFunc)
var leastValue T
// Pre-fill the heap with up to size values.
for range size {
select {
case <-ctx.Done():
return
case value, ok := <-inCh:
if !ok {
goto DRAIN
}
heap.Push(value)
}
}
// Prepare the least value to send.
leastValue, _ = heap.Pop()
// Main loop.
for {
// Send the least value.
select {
case <-ctx.Done():
return
case outCh <- leastValue:
}
// Read the next value from inCh and update the heap and least
// value.
select {
case <-ctx.Done():
return
case value, ok := <-inCh:
if !ok {
goto DRAIN
}
leastValue = heap.PushPop(value)
}
}
DRAIN:
// inCh was closed so we are done. Write all remaining values and
// return.
for value := range heap.PopAll() {
select {
case <-ctx.Done():
return
case outCh <- value:
}
}
}()
return outCh
}