-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathamqp.go
90 lines (77 loc) · 1.83 KB
/
amqp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package amqp_logger
import (
"bytes"
"os"
"io"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/streadway/amqp"
)
type AMQPLogWritter interface {
Close() error
}
func WithAMQP(logger log.Logger, options ...Option) log.Logger {
l := &amqpLogger{
baseLogger: log.With(logger),
appendLevel: true,
amqpURL: "amqp://guest:guest@localhost:5672//",
exchange: "amq.topic",
routingKey: "log",
newLogger: log.NewJSONLogger,
contentType: "application/json",
contentEncoding: "application/json",
}
for _, option := range options {
option(l)
}
l.transport = newTransport(l.amqpURL, l.baseLogger)
return l
}
func NewAMQPLogger(options ...Option) log.Logger {
return WithAMQP(log.NewJSONLogger(os.Stdout), options...)
}
type amqpLogger struct {
baseLogger log.Logger
appendLevel bool
amqpURL string
exchange string
routingKey string
newLogger func(io.Writer) log.Logger
contentType string
contentEncoding string
transport *rabbitMQTransport
}
func (l *amqpLogger) Log(keyvals ...interface{}) error {
if err := l.baseLogger.Log(keyvals...); err != nil {
return err
}
key := l.routingKey
correlationId := ""
if l.appendLevel {
for i := 0; i < len(keyvals); i++ {
if v, ok := keyvals[i].(level.Value); ok {
key = key + "." + v.String()
}
if v, ok := keyvals[i].(string); ok && v == "correlation_id" {
correlationId = keyvals[i+1].(string)
}
}
}
buf := &bytes.Buffer{}
l.newLogger(buf).Log(keyvals...)
msg := amqp.Publishing{
ContentType: l.contentType,
ContentEncoding: l.contentEncoding,
Timestamp: time.Now(),
Body: buf.Bytes(),
}
if correlationId != "" {
msg.CorrelationId = correlationId
}
if err := l.transport.Publish(l.exchange, key, msg); err != nil {
l.baseLogger.Log(level.Key(), level.ErrorValue(), "err", err)
return err
}
return nil
}