-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredis-storage.go
191 lines (159 loc) · 5.35 KB
/
redis-storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package quota
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
const (
REDIS_KEY_PREFIX = "holiday" // The prefix for all keys.
REDIS_TIMESTAMPS_KEY = "%s:timestamp" // The key for the timestamps.
REDIS_KEY_FORMAT = "%s:%s:%s:%d" // The format of the redis key. The first parameter is the prefix, the second is the symbol, the third is the interval and the fourth is the timestamp.
)
// RedisStorage is a storage implementation that uses Redis as a backend.
type RedisStorage struct {
mutex *sync.RWMutex
ctx context.Context
client *redis.Client
symbol string
interval time.Duration
}
// NewRedisStorage creates a new RedisStorage instance.
func NewRedisStorage(symbol string, interval time.Duration, config map[string]interface{}, ctx context.Context) *RedisStorage {
// Prepare the options.
options := &redis.Options{
Addr: fmt.Sprintf("%s:%d", config["redisHost"], config["redisPort"]),
Password: config["redisPassword"].(string),
DB: config["redisDB"].(int),
}
// Initialize the driver.
return &RedisStorage{
mutex: &sync.RWMutex{},
ctx: ctx,
client: redis.NewClient(options),
symbol: symbol,
interval: interval,
}
}
// Implemenet the Storage interface for RedisStorage.
// All returns all the values.
func (s *RedisStorage) All() (*Quota, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
// Get all the keys.
timestamps, err := s.client.LRange(s.ctx, fmt.Sprintf(REDIS_TIMESTAMPS_KEY, REDIS_KEY_PREFIX), 0, -1).Result()
if err != nil {
return nil, fmt.Errorf("failed to get the timestamps from redis: %s", err)
}
var quota Quota
// Retrieve the values.
for _, ts := range timestamps {
timestamp, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse the timestamp: %s", err)
}
key := fmt.Sprintf(REDIS_KEY_FORMAT, REDIS_KEY_PREFIX, s.symbol, s.interval, timestamp)
value, err := s.client.Get(s.ctx, key).Result()
if err != nil {
return nil, fmt.Errorf("failed to get the candle for %s: %v", key, err)
}
// Transform the result into a map.
candle, err := DeserializeCandle(value)
if err != nil {
return nil, fmt.Errorf("failed to deserialize the candle for %s: %v", key, err)
}
quota = append(quota, candle)
}
return "a, nil
}
// Get retrieves the value for the given key.
func (s *RedisStorage) Get(openTime time.Time) (*Candle, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
// Retrieve the value.
timestamp := openTime.Unix()
key := fmt.Sprintf(REDIS_KEY_FORMAT, REDIS_KEY_PREFIX, s.symbol, s.interval, timestamp)
value, err := s.client.Get(s.ctx, key).Result()
if err != nil {
return nil, err
}
// Transform the result into a map.
candle, err := DeserializeCandle(value)
if err != nil {
return nil, fmt.Errorf("failed to deserialize the candle for %s: %v", key, err)
}
return candle, nil
}
// GetByIndex retrieves candle from the storage by index.
func (s *RedisStorage) GetByIndex(index int) (*Candle, error) {
s.mutex.RLock()
defer s.mutex.RUnlock()
tmp := fmt.Sprint(index)
index64, _ := strconv.ParseInt(tmp, 10, 64)
// Retrieve the value.
timestamp, err := s.client.LIndex(s.ctx, fmt.Sprintf(REDIS_TIMESTAMPS_KEY, REDIS_KEY_PREFIX), index64).Result()
if err != nil {
return nil, fmt.Errorf("failed to get the timestamp: %s", err)
}
timestamp64, _ := strconv.ParseInt(timestamp, 10, 64)
openTime := time.Unix(timestamp64, 0)
return s.Get(openTime)
}
// Put stores the value for the given key.
func (s *RedisStorage) Put(c ...*Candle) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// Prepare the data.
for _, candle := range c {
timestamp := candle.OpenTime.Unix()
key := fmt.Sprintf(REDIS_KEY_FORMAT, REDIS_KEY_PREFIX, s.symbol, s.interval, timestamp)
value, err := SerializeCandle(candle)
if err != nil {
return fmt.Errorf("failed to serialize the candle: %s", err)
}
// Store the value.
err = s.client.Set(s.ctx, key, value, 0).Err()
if err != nil {
return fmt.Errorf("failed to store the value in redis: %s", err)
}
// Then store the key.
err = s.client.LPush(s.ctx, fmt.Sprintf(REDIS_TIMESTAMPS_KEY, REDIS_KEY_PREFIX), timestamp).Err()
if err != nil {
return fmt.Errorf("failed to store the key in redis: %s", err)
}
}
return nil
}
// Update updates the value for the given key.
func (s *RedisStorage) Update(candle ...*Candle) error {
return s.Put(candle...)
}
// Delete removes the value for the given key.
func (s *RedisStorage) Delete(c *Candle) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// Delete the value.
timestamp := c.OpenTime.Unix()
key := fmt.Sprintf(REDIS_KEY_FORMAT, REDIS_KEY_PREFIX, s.symbol, s.interval, timestamp)
err := s.client.Del(s.ctx, key).Err()
if err != nil {
return fmt.Errorf("failed to delete the value from redis: %s", err)
}
// Delete the key.
err = s.client.LRem(s.ctx, fmt.Sprintf(REDIS_TIMESTAMPS_KEY, REDIS_KEY_PREFIX), 0, timestamp).Err()
if err != nil {
return fmt.Errorf("failed to delete the key from redis: %s", err)
}
return nil
}
// Close closes the RedisStorage instance.
func (s *RedisStorage) Close() error {
return s.client.Close()
}
// PersistOlds will store the old candles into a persistance storage and remove them from the quota.
func (s *RedisStorage) PersistOlds(persist Storage, size int) error {
// redis is a persistent storage, so we don't need to persist olds.
return nil
}