-
Notifications
You must be signed in to change notification settings - Fork 135
/
loghub_sample.go
101 lines (98 loc) · 3.41 KB
/
loghub_sample.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"fmt"
"time"
"strconv"
"math/rand"
"github.com/galaxydi/go-loghub/example/util"
"github.com/gogo/protobuf/proto"
sls "github.com/galaxydi/go-loghub"
)
func main() {
fmt.Println("loghub sample begin")
begin_time := uint32(time.Now().Unix())
rand.Seed(int64(begin_time))
logstore_name := "test"
logstore, err := util.Project.GetLogStore(logstore_name)
if logstore == nil {
fmt.Printf("GetLogStore fail, err:%v\n", err)
err = util.Project.CreateLogStore(logstore_name, 1, 2)
if err != nil {
fmt.Printf("CreateLogStore fail, err: ", err)
return
}
fmt.Println("CreateLogStore success")
} else {
fmt.Printf("GetLogStore success, name: %s, ttl: %d, shardCount: %d, createTime: %d, lastModifyTime: %d\n", logstore.Name, logstore.TTL, logstore.ShardCount, logstore.CreateTime, logstore.LastModifyTime)
}
// put logs to logstore
for loggroupIdx := 0; loggroupIdx < 2; loggroupIdx++ {
logs := []*sls.Log {}
for logIdx := 0; logIdx < 100; logIdx++ {
content := []*sls.LogContent {}
for colIdx := 0; colIdx < 10; colIdx++ {
content = append(content, &sls.LogContent {
Key: proto.String(fmt.Sprintf("col_%d", colIdx)),
Value: proto.String(fmt.Sprintf("loggroup idx: %d, log idx: %d, col idx: %d, value: %d", loggroupIdx, logIdx, colIdx, rand.Intn(10000000))),
})
}
log := &sls.Log{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: content,
}
logs = append(logs, log)
}
loggroup := &sls.LogGroup {
Topic: proto.String(""),
Source: proto.String("10.230.201.117"),
Logs: logs,
}
// PostLogStoreLogs API Ref: https://intl.aliyun.com/help/doc-detail/29026.htm
err = logstore.PutLogs(loggroup)
if err == nil {
fmt.Println("PutLogs success")
} else {
fmt.Printf("PutLogs fail, err: %s\n", err)
}
time.Sleep(1000 * time.Millisecond)
}
// pull logs from logstore
shards, err := logstore.ListShards()
for _, sh := range shards {
if sh == 0 {
// GetCursor API Ref: https://intl.aliyun.com/help/doc-detail/29024.htm
begin_cursor, _ := logstore.GetCursor(sh, "begin")
end_cursor, _ := logstore.GetCursor(sh, "end")
// PullLogs API Ref: https://intl.aliyun.com/help/doc-detail/29025.htm
loggrouplist, next_cursor, _ := logstore.PullLogs(sh, begin_cursor, end_cursor, 100)
fmt.Printf("shard: %d, begin_cursor: %s, end_cursor: %s, next_cursor: %s\n", sh, begin_cursor, end_cursor, next_cursor)
for _, loggroup := range loggrouplist.LogGroups {
for _, log := range loggroup.Logs {
for _, content := range log.Contents {
fmt.Printf("key:%s, value:%s\n", content.GetKey(), content.GetValue())
}
}
}
} else {
begin_cursor, _ := logstore.GetCursor(sh, strconv.Itoa(int(begin_time) + 2))
for {
loggrouplist, next_cursor, _ := logstore.PullLogs(sh, begin_cursor, "", 2)
fmt.Printf("shard: %d, begin_cursor: %s, next_cursor: %s, len(loggrouplist.LogGroups): %d\n", sh, begin_cursor, next_cursor, len(loggrouplist.LogGroups))
if len(loggrouplist.LogGroups) == 0 {
// means no more data in this shard, you can break out or sleep to wait new data
break
} else {
for _, loggroup := range loggrouplist.LogGroups {
for _, log := range loggroup.Logs {
for _, content := range log.Contents {
fmt.Printf("key:%s, value:%s\n", content.GetKey(), content.GetValue())
}
}
}
begin_cursor = next_cursor
}
}
}
}
fmt.Println("loghub sample end")
}