-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
287 lines (249 loc) · 7.57 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"github.com/spf13/viper"
"log"
"strings"
"time"
)
//index 数据类型 key
//sitePv Str bsz:site_pv:md5(host)
//siteUv set bsz:site_uv:md5(host)
//pagePv zset bsz:page_pv:md5(host) / md5(path)
//pageUv set bsz:site_uv:md5(host):md5(host&path)
// to
// index 数据类型 key
// sitePv Str bsz:site_pv:md5(host)
// siteUv HyperLogLog bsz:site_uv:md5(host)
// pagePv zset bsz:page_pv:md5(host) / md5(path)
// pageUv HyperLogLog bsz:site_uv:md5(host):md5(host&path)
var RDB *redis.Client
func main() {
// 询问 是否备份数据了
var backup string
fmt.Print("您是否已经备份了数据 (dump.rdb), 继续操作可能造成未知错误或影响 (yes/no): ")
_, err := fmt.Scan(&backup)
if err != nil {
log.Fatalf("[ERROR] input failed: %v", err)
}
if backup != "yes" {
log.Fatalf("[ERROR] 请先备份您的数据!")
}
initConfig()
initRedis()
// Transaction
// 转移 siteUV
var cur uint64 = 0
var keys []string
for {
key := fmt.Sprintf("%s:site_uv:*", viper.GetString("redis.prefix"))
result := RDB.Scan(context.Background(), cur, key, 100)
if result.Err() != nil {
log.Printf("[ERROR] Redis scan failed: %v", result.Err())
continue
}
keys, cur = result.Val()
// get siteUv keys
if len(keys) > 0 {
for _, ks := range keys {
// log.Printf("[INFO] Redis scan keys: %v \r\n", keys)
// check if set
keyType := RDB.Type(context.Background(), ks)
if keyType.Err() != nil {
log.Printf("[ERROR] Redis Type failed: %v", keyType.Err())
continue
}
if keyType.Val() != "set" {
log.Printf("[INFO] Redis key %v is not set \r\n", ks)
continue
}
// get siteUvs (set)
pageUVs, err := RDB.SMembers(context.Background(), ks).Result()
if err != nil {
log.Printf("[ERROR] Redis SMembers failed: %v", err)
continue
}
// transfer pageUV to pageUV (HyperLogLog)
log.Printf("[INFO] transfer %v to HyperLogLog with %d members\r\n", ks, len(pageUVs))
for _, pageUV := range pageUVs {
k := strings.ReplaceAll(ks, viper.GetString("redis.prefix"), viper.GetString("redis.ToPrefix"))
if err := RDB.PFAdd(context.Background(), k, pageUV).Err(); err != nil {
log.Printf("[ERROR] Redis PFAdd failed: %v", err)
continue
}
}
}
}
if cur == 0 {
break
}
}
cur = 0
// 转移 pageUV
for {
key := fmt.Sprintf("%s:page_uv:*", viper.GetString("redis.prefix"))
result := RDB.Scan(context.Background(), cur, key, 100)
if result.Err() != nil {
log.Printf("[ERROR] Redis scan failed: %v", result.Err())
continue
}
keys, cur = result.Val()
// get pageUV keys
if len(keys) > 0 {
for _, ks := range keys {
// log.Printf("[INFO] Redis scan keys: %v \r\n", keys)
// check if set
keyType := RDB.Type(context.Background(), ks)
if keyType.Err() != nil {
log.Printf("[ERROR] Redis Type failed: %v", keyType.Err())
continue
}
if keyType.Val() != "set" {
log.Printf("[INFO] Redis key %v is not set \r\n", ks)
continue
}
// get pageUVs (set)
pageUVs, err := RDB.SMembers(context.Background(), ks).Result()
if err != nil {
log.Printf("[ERROR] Redis SMembers failed: %v", err)
continue
}
// transfer pageUV to pageUV (HyperLogLog)
log.Printf("[INFO] transfer %v to HyperLogLog with %d members\r\n", ks, len(pageUVs))
for _, pageUV := range pageUVs {
k := strings.ReplaceAll(ks, viper.GetString("redis.prefix"), viper.GetString("redis.ToPrefix"))
if err := RDB.PFAdd(context.Background(), k, pageUV).Err(); err != nil {
log.Printf("[ERROR] Redis PFAdd failed: %v", err)
continue
}
}
}
}
if cur == 0 {
break
}
}
// 转移 sitePV (str)
cur = 0
for {
key := fmt.Sprintf("%s:site_pv:*", viper.GetString("redis.prefix"))
result := RDB.Scan(context.Background(), cur, key, 100)
if result.Err() != nil {
log.Printf("[ERROR] Redis scan failed: %v", result.Err())
continue
}
keys, cur = result.Val()
// get sitePV keys
if len(keys) > 0 {
for _, ks := range keys {
// log.Printf("[INFO] Redis scan keys: %v \r\n", keys)
// check if str
keyType := RDB.Type(context.Background(), ks)
if keyType.Err() != nil {
log.Printf("[ERROR] Redis Type failed: %v", keyType.Err())
continue
}
if keyType.Val() != "string" {
log.Printf("[INFO] Redis key %v is not string \r\n", ks)
continue
}
// get sitePV (str)
res := RDB.Get(context.Background(), ks)
if res.Err() != nil {
log.Printf("[ERROR] Redis Get failed: %v", res.Err())
continue
}
// transfer sitePV to sitePV (str)
k := strings.ReplaceAll(ks, viper.GetString("redis.prefix"), viper.GetString("redis.ToPrefix"))
log.Printf("[INFO] transfer %v %s\r\n", ks, res.Val())
if err := RDB.Set(context.Background(), k, res.Val(), 0).Err(); err != nil {
log.Printf("[ERROR] Redis Set failed: %v", err)
continue
}
}
}
if cur == 0 {
break
}
}
// 转移 pagePV (zset)
cur = 0
for {
key := fmt.Sprintf("%s:page_pv:*", viper.GetString("redis.prefix"))
result := RDB.Scan(context.Background(), cur, key, 100)
if result.Err() != nil {
log.Printf("[ERROR] Redis scan failed: %v", result.Err())
continue
}
keys, cur = result.Val()
// get pagePV keys
if len(keys) > 0 {
for _, ks := range keys {
// log.Printf("[INFO] Redis scan keys: %v \r\n", keys)
// check if zset
keyType := RDB.Type(context.Background(), ks)
if keyType.Err() != nil {
log.Printf("[ERROR] Redis Type failed: %v", keyType.Err())
continue
}
if keyType.Val() != "zset" {
log.Printf("[INFO] Redis key %v is not zset \r\n", ks)
continue
}
// get pagePV (zset)
res := RDB.ZRangeWithScores(context.Background(), ks, 0, -1)
if res.Err() != nil {
log.Printf("[ERROR] Redis ZRangeWithScores failed: %v", res.Err())
continue
}
// transfer pagePV to pagePV (zset)
k := strings.ReplaceAll(ks, viper.GetString("redis.prefix"), viper.GetString("redis.ToPrefix"))
log.Printf("[INFO] transfer %v with %v\r\n", ks, res.Val())
for _, v := range res.Val() {
if err := RDB.ZAdd(context.Background(), k, redis.Z{Score: v.Score, Member: v.Member.(string)}).Err(); err != nil {
log.Printf("[ERROR] Redis ZAdd failed: %v", err)
continue
}
}
}
}
if cur == 0 {
break
}
}
log.Printf("[INFO] 数据已完成迁移, 新的 redis 前缀为: %s , 在验证迁移成功后, 您可以稍后手动删除旧数据\r\n", viper.GetString("redis.ToPrefix"))
log.Println("[INFO] 请参考 readme 进行后续操作")
}
func initConfig() {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
log.Fatal(err)
}
log.Printf("[INFO] Config init success %v", viper.AllSettings())
}
func initRedis() {
log.Printf("[INFO] Redis trying connect to tcp://%s/%d", viper.GetString("redis.address"), viper.GetInt("redis.database"))
option := &redis.Options{
Addr: viper.GetString("redis.address"),
Password: viper.GetString("redis.password"),
DB: viper.GetInt("redis.database"),
MinIdleConns: 5,
MaxIdleConns: 20,
MaxRetries: 5,
MaxActiveConns: 20,
ConnMaxLifetime: 5 * time.Minute,
}
rdb := redis.NewClient(option)
RDB = rdb
// test redis
pong, err := rdb.Ping(context.Background()).Result()
if err != nil {
log.Fatalf("[ERROR] Redis ping failed: %v", err)
}
log.Printf("[INFO] Redis init success, pong: %s ", pong)
}