-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka.go
116 lines (90 loc) · 2.52 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"log"
//"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
"encoding/json"
"github.com/confluentinc/confluent-kafka-go/kafka"
"net/http"
)
const (
host = "host.docker.internal"
topic = "test"
group = "myGroup"
)
// Message struct
type Message struct {
Data json.RawMessage `json:"data"`
}
// SendToKafka does what it says on the tin
func SendToKafka(rw http.ResponseWriter, r *http.Request) {
log.Println("SendToKafka plugin starting")
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": host})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Println("Delivery failed", ev.TopicPartition)
} else {
log.Println("Delivered message to ", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
top := topic
// Declare a new Message struct
var m Message
// Try to decode the request body into the struct. If there is an error,
// respond to the client with the error message and a 400 status code.
err = json.NewDecoder(r.Body).Decode(&m)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &top, Partition: kafka.PartitionAny},
Value: []byte(m.Data),
}, nil)
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
// ConsumeFromKafka does what it says on the tin
func ConsumeFromKafka(rw http.ResponseWriter, r *http.Request) {
log.Println("ConsumeFromKafka plugin starting")
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": host,
"group.id": group,
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{topic, "^aRegex.*[Tt]opic"}, nil)
msg, err := c.ReadMessage(-1)
if err == nil {
log.Println("Message on ", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
log.Println("Consumer error: ", err, msg)
}
c.Close()
var m Message
json.Unmarshal(msg.Value, &m)
/* jsonData, err := json.Marshal(string(msg.Value))
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
return
} */
// send HTTP response from Golang plugin
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
//rw.Write(jsonData)
rw.Write(m.Data)
}
func main() {}