-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlog.go
284 lines (251 loc) · 6.12 KB
/
log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package openobserve
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
)
type basicAuth struct {
username string
password string
}
type Option func(*OpenObLog)
type OpenObLog struct {
*sync.Mutex
ctx context.Context
httpc *http.Client
addr string
c chan any
list []any
basicAuth *basicAuth
authorization *string
fullSize int // 达到此数量时不再等waitTime到期,直接写出
waitTime time.Duration
requestTimeout time.Duration
compress bool // 请求报文进行gzip压缩传输
compressLevel int // 请求报文gzip压缩级别,-1~9,0:不压缩,9:最高压缩,-1:默认压缩
everyDayCut bool
indexName string
}
// WithBasicAuth 使用账号密码
func WithBasicAuth(username, password string) Option {
return func(log *OpenObLog) {
log.basicAuth = &basicAuth{
username: username, password: password,
}
}
}
// WithAuthorization 使用token
func WithAuthorization(token string) Option {
return func(log *OpenObLog) {
token = "Basic " + token
log.authorization = &token
}
}
// WithFullSize 个数达到此配置马上请求
func WithFullSize(fullSize int) Option {
return func(log *OpenObLog) {
log.fullSize = fullSize
if log.c != nil {
close(log.c)
}
log.c = make(chan any, fullSize+16)
}
}
// WithWaitTime 每过此配置的发起请求
func WithWaitTime(td time.Duration) Option {
return func(log *OpenObLog) {
log.waitTime = td
}
}
// WithRequestTimeout http 请求超时
func WithRequestTimeout(tm time.Duration) Option {
return func(log *OpenObLog) {
log.requestTimeout = tm
}
}
// WithIndexName name索引名,cut是否按天切割
func WithIndexName(name string, cut bool) Option {
return func(log *OpenObLog) {
log.everyDayCut = cut
log.indexName = name
}
}
func buildLastName(name string) string {
return fmt.Sprintf("%s_%s", name,
time.Now().Format("20060102"))
}
// WithCompress 请求报文gzip压缩
func WithCompress(compress bool, compressLevel ...int) Option {
// 如果没有传递 compressLevel 参数,则使用默认值 -1
var level int
if len(compressLevel) > 0 {
level = compressLevel[0]
// 验证 compressLevel 是否在有效范围 -1 到 9 之间
if level < -1 || level > 9 {
log.Println("Invalid compress level:", level, "Valid range is -1 to 9")
level = -1 // 设置为默认值
}
} else {
level = -1 // 默认压缩级别
}
return func(log *OpenObLog) {
log.compress = compress
log.compressLevel = level
}
}
func New(ctx context.Context, addr string, options ...Option) *OpenObLog {
l := &OpenObLog{
Mutex: &sync.Mutex{},
ctx: ctx,
addr: addr,
fullSize: 16,
waitTime: 0,
requestTimeout: time.Second * 10,
indexName: "open",
}
for _, option := range options {
option(l)
}
l.httpc = &http.Client{}
l.httpc.Timeout = l.requestTimeout
l.list = make([]any, 0, l.fullSize)
l.c = make(chan any, l.fullSize*2) // 管道长度是缓冲区2倍,增加冗余,防止刷缓冲区的时候,发送协程被阻塞
l.run()
return l
}
func (l *OpenObLog) run() {
go func() {
for {
select {
case val := <-l.c:
l.Lock()
l.list = append(l.list, val)
l.Unlock()
if len(l.list) >= l.fullSize {
l.sendList()
}
case <-l.ctx.Done():
l.sendList()
return
}
}
}()
if l.waitTime > 0 {
go func() {
ticker := time.NewTicker(l.waitTime)
for {
select {
case <-ticker.C:
l.sendList()
case <-l.ctx.Done():
l.sendList()
return
}
}
}()
}
}
func (l *OpenObLog) request(obj any) bool {
// 首先将日志对象序列化为 JSON 格式
data, err := json.Marshal(obj)
if err != nil {
log.Println("OpenObLog.request json.Marshal:", err)
return false
}
// 发送缓冲区
var buf bytes.Buffer
// 预设一个标志,判断请求报文是否进行压缩
bReqCompress := false
// 如果启用压缩且压缩级别有效,进行压缩
if l.compress {
gz, err := gzip.NewWriterLevel(&buf, l.compressLevel)
if err != nil {
log.Println("OpenObLog.request gzip.NewWriterLevel:", err)
} else {
// 压缩数据
if _, err = gz.Write(data); err != nil {
log.Println("OpenObLog.request gzip compression:", err)
buf.Reset()
}
gz.Close()
bReqCompress = true
}
}
if !bReqCompress {
// 如果没有启用压缩,直接发送原始数据
buf.Write(data)
}
// 根据每日报表切换索引名称
indexName := l.indexName
if l.everyDayCut {
indexName = buildLastName(l.indexName)
}
// 创建 HTTP 请求,使用 buf 作为请求体
req, err := http.NewRequest(http.MethodPost,
fmt.Sprintf("%s/api/default/%s/_json", l.addr, indexName),
&buf)
if err != nil {
log.Println("OpenObLog.request NewRequest:", err)
return false
}
req.Header.Set("Content-Type", "application/json")
if l.authorization != nil {
req.Header.Set("Authorization", *l.authorization)
}
if l.basicAuth != nil {
req.SetBasicAuth(l.basicAuth.username, l.basicAuth.password)
}
// 设置 Content-Encoding 为 gzip(如果启用了压缩)
if bReqCompress {
req.Header.Set("Content-Encoding", "gzip")
}
// 发送 HTTP 请求
response, err := l.httpc.Do(req)
if err != nil {
log.Println("OpenObLog.request DoRequest:", err)
return false
}
defer response.Body.Close()
// 检查响应状态
if response.StatusCode == http.StatusOK {
return true
}
body, _ := io.ReadAll(response.Body)
log.Println("OpenObLog.request request error:", string(body))
return false
}
func (l *OpenObLog) sendList() {
l.Lock()
defer l.Unlock()
if len(l.list) == 0 {
return
}
if l.request(l.list) || len(l.list) > l.fullSize*10 {
l.list = l.list[:0] // 如果多次发送失败,应及时清除缓存,防止积攒在内存造成业务异常 TODO:增加可选参数
}
}
// SendSync 同步发送
func (l *OpenObLog) SendSync(data any) bool {
if data == nil {
return
}
return l.request(data)
}
// Send 异步发送
func (l *OpenObLog) Send(data any) {
if data == nil {
return
}
select {
case l.c <- data:
default:
log.Println("OpenObserve send error by full size.")
}
}