-
Notifications
You must be signed in to change notification settings - Fork 5
/
group.go
113 lines (93 loc) · 2.75 KB
/
group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package consumer
import (
"context"
"sync"
"github.com/Shopify/sarama"
"github.com/jeroenrinzema/commander/internal/circuit"
"github.com/sirupsen/logrus"
)
// NewGroupHandle initializes a new GroupHandle
func NewGroupHandle(client *Client) *GroupHandle {
handle := &GroupHandle{
client: client,
ready: circuit.Ready{},
}
return handle
}
// GroupHandle represents a Sarama consumer group consumer handle
type GroupHandle struct {
client *Client
consumer sarama.ConsumerGroup
group string
consumptions sync.WaitGroup
closing bool
ready circuit.Ready
}
// Connect initializes a new Sarama consumer group and awaits till the consumer
// group is set up and ready to consume messages.
func (handle *GroupHandle) Connect(conn sarama.Client, topics []string, group string) error {
consumer, err := sarama.NewConsumerGroupFromClient(group, conn)
if err != nil {
return err
}
go func() {
for {
if handle.closing {
break
}
ctx := context.Background()
err := consumer.Consume(ctx, topics, handle)
if err != nil {
logrus.Error(err)
}
}
}()
select {
case err := <-consumer.Errors():
return err
case <-handle.ready.On():
}
handle.consumer = consumer
handle.group = group
return nil
}
// Setup is run at the beginning of a new session, before ConsumeClaim.
// This method is a implementation of the sarama consumer interface.
func (handle *GroupHandle) Setup(sarama.ConsumerGroupSession) error {
handle.ready.Mark()
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (handle *GroupHandle) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// When a Kafka message is claimed is it passed to the client Claim method.
// If an error occurred during processing of the claimed message is the message marked to be retried.
func (handle *GroupHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
handle.consumptions.Add(1)
go func(message *sarama.ConsumerMessage) {
err := handle.client.Claim(message)
defer handle.consumptions.Done()
if err == ErrRetry {
// Mark the message to be consumed again
session.ResetOffset(message.Topic, message.Partition, message.Offset, "")
return
}
session.MarkMessage(message, "")
}(message)
}
return nil
}
// Close closes the group consume handle and awaits till all claimed messages are processed.
// The consumer group get's marked for closing
func (handle *GroupHandle) Close() error {
handle.closing = true
err := handle.consumer.Close()
if err != nil {
return err
}
handle.consumptions.Wait()
return nil
}