-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpubsub.go
108 lines (98 loc) · 2.69 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Package pubsub is a simple publish-subscribe implementation using generics.
package pubsub
import "context"
// A Topic is a pub-sub server that handles messages of type T.
type Topic[T any] struct {
publishCh chan<- T
subscribeCh chan<- chan<- T
unsubscribeCh chan<- chan<- T
}
// NewTopic returns a new Topic. It will terminate when ctx is done or when
// Close is called.
func NewTopic[T any](ctx context.Context) *Topic[T] {
publishCh := make(chan T)
subscribeCh := make(chan chan<- T)
unsubscribeCh := make(chan chan<- T)
t := &Topic[T]{
publishCh: publishCh,
subscribeCh: subscribeCh,
unsubscribeCh: unsubscribeCh,
}
go func() {
subscribers := make(map[chan<- T]struct{})
defer func() {
for subscriber := range subscribers {
close(subscriber)
}
}()
for {
select {
case <-ctx.Done():
return
case value, ok := <-publishCh:
if !ok {
return
}
for subscriber := range subscribers {
subscriber <- value
}
case subscriber := <-subscribeCh:
subscribers[subscriber] = struct{}{}
case subscriber := <-unsubscribeCh:
delete(subscribers, subscriber)
close(subscriber)
}
}
}()
return t
}
// Close closes t.
func (t *Topic[T]) Close() {
close(t.publishCh)
}
// PublishContext publishes value to all subscribers.
func (t *Topic[T]) Publish(value T) {
t.publishCh <- value
}
// PublishContext publishes value to all subscribers. If ctx is done then it
// returns without publishing value.
func (t *Topic[T]) PublishContext(ctx context.Context, value T) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
case t.publishCh <- value:
return nil
}
}
// Subscribe adds ch as a subscriber. t takes ownership of ch and will close it
// when t terminates.
func (t *Topic[T]) Subscribe(ch chan<- T) {
t.subscribeCh <- ch
}
// SubscribeContext adds ch as a subscriber. t takes ownership of ch and will
// close it when t terminates. If ctx is done then it returns without
// subscribing ch and does not take ownership of ch.
func (t *Topic[T]) SubscribeContext(ctx context.Context, ch chan<- T) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
case t.subscribeCh <- ch:
return nil
}
}
// Unsubscribe removes ch as a subscriber. t will close ch when the
// unsubscription is complete.
func (t *Topic[T]) Unsubscribe(ch chan<- T) {
t.unsubscribeCh <- ch
}
// UnsubscribeContext removes ch as a subscriber. t will close ch when the
// unsubscription is complete. If ctx is done then it returns without
// unsubscribing ch.
func (t *Topic[T]) UnsubscribeContext(ctx context.Context, ch chan<- T) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
case t.unsubscribeCh <- ch:
return nil
}
}