-
Notifications
You must be signed in to change notification settings - Fork 2
/
gokv.go
166 lines (139 loc) · 4.33 KB
/
gokv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Package gokv implements a set of drivers and a common interface for working with different key/value storage systems
package gokv
import (
"errors"
"sync"
"golang.org/x/sync/errgroup"
"github.com/bradberger/gokv/kv"
"stathat.com/c/consistent"
)
const (
// ReplicateAsync indicates that replication will be done asyncronously.
// Set commands will return without error as soon as at least one node has
// the value
ReplicateAsync ReplicationMethod = iota
// ReplicateSync indicates that replication will be done syncronously.
// Set commands will return without error only if all nodes return without error
ReplicateSync = iota
)
var (
_ kv.Store = (*Client)(nil)
)
// ReplicationMethod determines whether replication takes place asyncronously or syncronously.
// Use ReplicateAsync for asyncronous replication, ReplicateSync for syncronous replication.
type ReplicationMethod int
// Client is a cache client with built in replication to any number of different caches.
// This allows replication and syncronization across various caches using the set of drivers
// available as subpackages, including Memcached, Redis, in-memory caches, and more.
type Client struct {
nodes map[string]kv.Store
ch *consistent.Consistent
replicateNodeCt int
replicateMethod ReplicationMethod
sync.Mutex
}
// New returns a new initialized cache Client with no nodes.
func New() *Client {
return &Client{nodes: make(map[string]kv.Store, 0), ch: consistent.New(), replicateMethod: ReplicateAsync}
}
// AddNode adds a cache node with the given name, but only if it doesn't already exist
func (c *Client) AddNode(name string, node kv.Store) error {
if _, exists := c.nodes[name]; exists {
return errors.New("node already exists")
}
return c.SetNode(name, node)
}
// SetNode sets the cache node with the given name, regardless of whether it already exists or not
func (c *Client) SetNode(name string, node kv.Store) error {
if node == nil {
return errors.New("cache node is nil")
}
c.nodes[name] = node
c.ch.Add(name)
return nil
}
// ReplaceNode adds a cache node with the given name, but only if it already exists
func (c *Client) ReplaceNode(name string, node kv.Store) error {
if _, exists := c.nodes[name]; !exists {
return errors.New("node does not exist")
}
return c.SetNode(name, node)
}
// RemoveNode removes a node with the given name from the node list
func (c *Client) RemoveNode(name string) error {
c.Lock()
defer c.Unlock()
delete(c.nodes, name)
c.ch.Remove(name)
return nil
}
// SetReplicateMethod sets the replication method
func (c *Client) SetReplicateMethod(m ReplicationMethod) {
c.replicateMethod = m
}
// ReplicateToN sets how many nodes each key should be replicated to
func (c *Client) ReplicateToN(numNodes int) error {
if numNodes > len(c.ch.Members()) {
return errors.New("invalid number of nodes")
}
c.replicateNodeCt = numNodes
return nil
}
func (c *Client) node(nodeName string) kv.Store {
return c.nodes[nodeName]
}
// Set implements the "kv.Store".Set() interface
func (c *Client) Set(key string, value interface{}) (err error) {
nodes, err := c.ch.GetN(key, c.replicateNodeCt)
if err != nil {
return
}
if c.replicateMethod == ReplicateSync {
var eg errgroup.Group
for i := range nodes {
nodeName := nodes[i]
eg.Go(func() error {
return c.node(nodeName).Set(key, value)
})
}
return eg.Wait()
}
err = c.node(nodes[0]).Set(key, value)
if len(nodes) > 1 {
nodes = nodes[1:]
for i := range nodes {
go c.node(nodes[i]).Set(key, value)
}
}
return
}
// Get implements the "kv.Store".Get() interface. It checks nodes in order
// of priority, and returns success if the value exists on any of them.
func (c *Client) Get(key string, dstVal interface{}) (err error) {
nodes, err := c.ch.GetN(key, c.replicateNodeCt)
if err != nil {
return err
}
for i := range nodes {
if err = c.node(nodes[i]).Get(key, dstVal); err == nil {
return
}
}
return kv.ErrNotFound
}
// Del implements the "kv.Store".Del() interface. It deletes the given key across
// all replicated nodes and returns error if any of those delete operations fail.
func (c *Client) Del(key string) (err error) {
nodes, err := c.ch.GetN(key, c.replicateNodeCt)
if err != nil {
return
}
var eg errgroup.Group
for i := range nodes {
name := nodes[i]
eg.Go(func() error {
return c.node(name).Del(key)
})
}
return eg.Wait()
}