-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.go
68 lines (55 loc) · 1.62 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package stream
import (
"context"
)
// Stream used to build a Consumer object
type Stream interface {
GetConsumer(ctx context.Context, group string) Consumer
GetProducer(ctx context.Context, group string) Producer
}
// Message wraps context and data from stream
type Message struct {
Context context.Context
Data interface{}
}
// Consumer provides read access to a message stream
type Consumer interface {
Messages() <-chan Message
Ack(context.Context) error
Nack(context.Context) error
Close() error
Errors() <-chan error
Done() <-chan struct{}
}
// Producer provides publish access to a message stream
type Producer interface {
Publish(context.Context, interface{}) error
Close() error
Errors() <-chan error
Done() <-chan struct{}
}
// Config common configuration for streams
type Config struct {
Endpoints []string
Topic string
MaxInflightMessages int
Custom interface{}
}
// TrackedMessagesContextKey context key to get tracked messages from context. Tracked messages are stored in []interface{}
const TrackedMessagesContextKey = streamContextKey("TRACKED_MESSAGES")
// SetTrackers adds message trackers to context
func SetTrackers(ctx context.Context, tracker ...interface{}) context.Context {
tracks := GetTrackers(ctx)
for _, t := range tracker {
tracks = append(tracks, t)
}
return context.WithValue(ctx, TrackedMessagesContextKey, tracks)
}
// GetTrackers returns an array of trackers
func GetTrackers(ctx context.Context) []interface{} {
if a, ok := ctx.Value(TrackedMessagesContextKey).([]interface{}); ok {
return a
}
return []interface{}{}
}
type streamContextKey string