Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

Commit

Permalink
refactor db package. Move it to internal package.
Browse files Browse the repository at this point in the history
  • Loading branch information
Maciej Winnicki committed Aug 11, 2017
1 parent f651bbc commit 85cd6f8
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 51 deletions.
10 changes: 5 additions & 5 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/cors"

"github.com/serverless/event-gateway/db"
"github.com/serverless/event-gateway/functions"
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/metrics"
"github.com/serverless/event-gateway/pubsub"
"github.com/serverless/event-gateway/util/httpapi"
Expand All @@ -20,7 +20,7 @@ import (
func StartConfigAPI(config httpapi.Config) httpapi.Server {
router := httprouter.New()

functionsDB := db.NewPrefixedStore("/serverless-event-gateway/functions", config.KV)
functionsDB := kv.NewPrefixedStore("/serverless-event-gateway/functions", config.KV)
functionService := &functions.Functions{
DB: functionsDB,
Log: config.Log,
Expand All @@ -29,9 +29,9 @@ func StartConfigAPI(config httpapi.Config) httpapi.Server {
functionsAPI.RegisterRoutes(router)

pubsubService := &pubsub.PubSub{
TopicsDB: db.NewPrefixedStore("/serverless-event-gateway/topics", config.KV),
SubscriptionsDB: db.NewPrefixedStore("/serverless-event-gateway/subscriptions", config.KV),
EndpointsDB: db.NewPrefixedStore("/serverless-event-gateway/endpoints", config.KV),
TopicsDB: kv.NewPrefixedStore("/serverless-event-gateway/topics", config.KV),
SubscriptionsDB: kv.NewPrefixedStore("/serverless-event-gateway/subscriptions", config.KV),
EndpointsDB: kv.NewPrefixedStore("/serverless-event-gateway/endpoints", config.KV),
FunctionsDB: functionsDB,
Log: config.Log,
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/event-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"go.uber.org/zap/zapcore"

"github.com/serverless/event-gateway/api"
"github.com/serverless/event-gateway/db"
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/metrics"
"github.com/serverless/event-gateway/util"
"github.com/serverless/event-gateway/util/httpapi"
Expand Down Expand Up @@ -61,7 +61,7 @@ func main() {
shutdownGuard := util.NewShutdownGuard()

if *developmentMode {
db.EmbedEtcd(*embedDataDir, *embedPeerAddr, *embedCliAddr, shutdownGuard)
kv.EmbedEtcd(*embedDataDir, *embedPeerAddr, *embedCliAddr, shutdownGuard)
}

dbHostStrings := strings.Split(*dbHosts, ",")
Expand Down
12 changes: 6 additions & 6 deletions integrationtests/stub/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import (
"github.com/julienschmidt/httprouter"
"go.uber.org/zap"

"github.com/serverless/event-gateway/db"
"github.com/serverless/event-gateway/functions"
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/pubsub"
)

// ConfigAPIServer creates test Configuration API server.
func ConfigAPIServer(kv store.Store, log *zap.Logger) *httptest.Server {
func ConfigAPIServer(kvstore store.Store, log *zap.Logger) *httptest.Server {
apiRouter := httprouter.New()

fnsDB := db.NewPrefixedStore("/serverless-event-gateway/functions", kv)
fnsDB := kv.NewPrefixedStore("/serverless-event-gateway/functions", kvstore)
fns := &functions.Functions{
DB: fnsDB,
Log: log,
Expand All @@ -25,9 +25,9 @@ func ConfigAPIServer(kv store.Store, log *zap.Logger) *httptest.Server {
fnsapi.RegisterRoutes(apiRouter)

ps := &pubsub.PubSub{
TopicsDB: db.NewPrefixedStore("/serverless-event-gateway/topics", kv),
SubscriptionsDB: db.NewPrefixedStore("/serverless-event-gateway/subscriptions", kv),
EndpointsDB: db.NewPrefixedStore("/serverless-event-gateway/endpoints", kv),
TopicsDB: kv.NewPrefixedStore("/serverless-event-gateway/topics", kvstore),
SubscriptionsDB: kv.NewPrefixedStore("/serverless-event-gateway/subscriptions", kvstore),
EndpointsDB: kv.NewPrefixedStore("/serverless-event-gateway/endpoints", kvstore),
FunctionsDB: fnsDB,
Log: log,
}
Expand Down
4 changes: 2 additions & 2 deletions integrationtests/stub/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/etcd"

"github.com/serverless/event-gateway/db"
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/util"
)

Expand Down Expand Up @@ -38,7 +38,7 @@ func TestEtcd() (store.Store, *util.ShutdownGuard) {
cliKvAddr := kvAddr(cliPort)
cliAddr := "http://" + cliKvAddr

db.EmbedEtcd(dataDir, peerAddr, cliAddr, shutdownGuard)
kv.EmbedEtcd(dataDir, peerAddr, cliAddr, shutdownGuard)

cli, err := libkv.NewStore(
store.ETCD,
Expand Down
5 changes: 2 additions & 3 deletions db/embed.go → internal/kv/embed.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package db
package kv

import (
"net/url"
Expand All @@ -13,7 +13,6 @@ import (
// EmbedEtcd starts an embedded etcd instance. It can be shut down by closing the shutdown chan.
// It returns a chan that is closed upon startup, and a chan that is closed once shutdown is complete.
func EmbedEtcd(dataDir, peerAddr, cliAddr string, shutdownGuard *util.ShutdownGuard) {

cfg := embed.NewConfig()

// set advertise urls
Expand Down Expand Up @@ -74,7 +73,7 @@ func EmbedEtcd(dataDir, peerAddr, cliAddr string, shutdownGuard *util.ShutdownGu
e.Server.Stop()
shutdownGuard.ShutdownAndDone()
shutdownGuard.Wait()
panic("Etcd failed to start: " + err.Error())
panic("etcd failed to start: " + err.Error())
}
}()
}
Expand Down
6 changes: 3 additions & 3 deletions db/prefixed.go → internal/kv/prefixed.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package db
package kv

import (
"strings"

"github.com/docker/libkv/store"
)

// PrefixedStore namespaces a libkv Store instance
// PrefixedStore namespaces a libkv Store instance.
type PrefixedStore struct {
root string
kv store.Store
}

// NewPrefixedStore creates a new namespaced libkv Store
// NewPrefixedStore creates a new namespaced libkv Store.
func NewPrefixedStore(root string, kv store.Store) *PrefixedStore {
if !strings.HasSuffix(root, "/") {
root = root + "/"
Expand Down
46 changes: 22 additions & 24 deletions db/watcher.go → internal/kv/watcher.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package db
package kv

import (
"math"
Expand All @@ -11,18 +11,9 @@ import (
"github.com/docker/libkv/store"
)

// Reactive is a type that can react to state changes on keys in a directory.
type Reactive interface {
Created(key string, value []byte)
Modified(key string, newValue []byte)
Deleted(key string, lastKnownValue []byte)
}

// PathWatcher provides a means of watching for changes to
// interesting configuration in the backing database. It also
// maintains a cache of updates observed by a Reactive
// instance.
type PathWatcher struct {
// Watcher provides a means of watching for changes to interesting configuration in the backing database. It also
// maintains a cache of updates observed by a Reactive instance.
type Watcher struct {
path string
kv store.Store
log *zap.Logger
Expand All @@ -42,8 +33,8 @@ type PathWatcher struct {
ReconciliationJitter int
}

// NewPathWatcher instantiates a new PathWatcher.
func NewPathWatcher(path string, kv store.Store, log *zap.Logger) *PathWatcher {
// NewWatcher instantiates a new Watcher.
func NewWatcher(path string, kv store.Store, log *zap.Logger) *Watcher {
if path == "/" {
panic("Root (\"/\") used for watch path. Please namespace all usage to avoid performance issues.")
} else if !strings.HasPrefix(path, "/") {
Expand All @@ -53,7 +44,7 @@ func NewPathWatcher(path string, kv store.Store, log *zap.Logger) *PathWatcher {
if !strings.HasSuffix(path, "/") {
path = path + "/"
}
return &PathWatcher{
return &Watcher{
path: path,
kv: kv,
log: log,
Expand All @@ -66,7 +57,7 @@ func NewPathWatcher(path string, kv store.Store, log *zap.Logger) *PathWatcher {
// React will watch for events on the PathWatcher's root directory,
// and call the Created/Modified/Deleted functions on a provided
// Reactive when changes are detected.
func (rfs *PathWatcher) React(reactor Reactive, shutdown chan struct{}) {
func (rfs *Watcher) React(reactor reactive, shutdown chan struct{}) {
events := make(chan event)
go rfs.watchRoot(events, shutdown)

Expand Down Expand Up @@ -110,27 +101,27 @@ type cachedValue struct {
LastIndex uint64
}

func (rfs *PathWatcher) resetBackoff() {
func (rfs *Watcher) resetBackoff() {
rfs.backoffFactor = 1
}

func (rfs *PathWatcher) backoff() {
func (rfs *Watcher) backoff() {
rfs.log.Info("Backing-off after a failure",
zap.String("event", "backoff"),
zap.Int("seconds", rfs.backoffFactor))
time.Sleep(time.Duration(rfs.backoffFactor) * time.Second)
rfs.backoffFactor = int(math.Min(float64(rfs.backoffFactor<<1), 8))
}

func (rfs *PathWatcher) reconciliationTimeout() <-chan time.Time {
func (rfs *Watcher) reconciliationTimeout() <-chan time.Time {
// use a minimum jitter of 1
maxJitter := int(math.Max(float64(rfs.ReconciliationJitter), 1))
jitter := rand.Intn(maxJitter)
delay := time.Duration(jitter+rfs.ReconciliationBaseDelay) * time.Second
return time.After(delay)
}

func (rfs *PathWatcher) watchRoot(outgoingEvents chan event, shutdown chan struct{}) {
func (rfs *Watcher) watchRoot(outgoingEvents chan event, shutdown chan struct{}) {
// NB when extending libkv usage for DB's other than etcd, the watch behavior
// will need to be carefully considered, as the code below will likely need
// to be changed depending on which backend database is used.
Expand Down Expand Up @@ -168,7 +159,7 @@ func (rfs *PathWatcher) watchRoot(outgoingEvents chan event, shutdown chan struc

if !exists {
// must set IsDir to true since backend may be etcd
err := rfs.kv.Put(rfs.path, []byte{}, &store.WriteOptions{IsDir: true})
err = rfs.kv.Put(rfs.path, []byte{}, &store.WriteOptions{IsDir: true})
if err != nil {
if strings.HasPrefix(err.Error(), "102: Not a file") {
rfs.log.Debug("Another node (probably) created the root directory first.")
Expand Down Expand Up @@ -204,7 +195,7 @@ func (rfs *PathWatcher) watchRoot(outgoingEvents chan event, shutdown chan struc
}
}

func (rfs *PathWatcher) processEvents(cache *map[string]cachedValue, incomingEvents <-chan []*store.KVPair,
func (rfs *Watcher) processEvents(cache *map[string]cachedValue, incomingEvents <-chan []*store.KVPair,
outgoingEvents chan event, shutdown chan struct{}) bool {

for {
Expand Down Expand Up @@ -234,7 +225,7 @@ func (rfs *PathWatcher) processEvents(cache *map[string]cachedValue, incomingEve
}
}

func (rfs *PathWatcher) diffCache(kvs []*store.KVPair, outgoingevents chan event,
func (rfs *Watcher) diffCache(kvs []*store.KVPair, outgoingevents chan event,
cache map[string]cachedValue) map[string]cachedValue {

nextCache := map[string]cachedValue{}
Expand Down Expand Up @@ -284,3 +275,10 @@ func (rfs *PathWatcher) diffCache(kvs []*store.KVPair, outgoingevents chan event

return nextCache
}

// Reactive is a type that can react to state changes on keys in a directory.
type reactive interface {
Created(key string, value []byte)
Modified(key string, newValue []byte)
Deleted(key string, lastKnownValue []byte)
}
12 changes: 6 additions & 6 deletions targetcache/targetcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/docker/libkv/store"
"go.uber.org/zap"

"github.com/serverless/event-gateway/db"
"github.com/serverless/event-gateway/functions"
"github.com/serverless/event-gateway/internal/kv"
"github.com/serverless/event-gateway/pubsub"
)

Expand Down Expand Up @@ -74,19 +74,19 @@ func (tc *LibKVTargetCache) Shutdown() {
}

// New instantiates a new LibKVTargetCache, rooted at a particular location.
func New(path string, kv store.Store, log *zap.Logger, debug ...bool) *LibKVTargetCache {
func New(path string, kvstore store.Store, log *zap.Logger, debug ...bool) *LibKVTargetCache {
// make sure we have a trailing slash for trimming future updates
if !strings.HasSuffix(path, "/") {
path = path + "/"
}

// path watchers
functionPathWatcher := db.NewPathWatcher(path+"functions", kv, log)
endpointPathWatcher := db.NewPathWatcher(path+"endpoints", kv, log)
subscriptionPathWatcher := db.NewPathWatcher(path+"subscriptions", kv, log)
functionPathWatcher := kv.NewWatcher(path+"functions", kvstore, log)
endpointPathWatcher := kv.NewWatcher(path+"endpoints", kvstore, log)
subscriptionPathWatcher := kv.NewWatcher(path+"subscriptions", kvstore, log)

if len(debug) == 1 && debug[0] {
debugReconciliation := func(w ...*db.PathWatcher) {
debugReconciliation := func(w ...*kv.Watcher) {
for _, w := range w {
w.ReconciliationJitter = 0
w.ReconciliationBaseDelay = 3
Expand Down

0 comments on commit 85cd6f8

Please # to comment.