Skip to content

Commit

Permalink
supports inmemory cache expiration for twindb
Browse files Browse the repository at this point in the history
Signed-off-by: Ashraf Fouda <ashraf.m.fouda@gmail.com>
  • Loading branch information
ashraffouda committed Dec 18, 2024
1 parent e336282 commit f5ce90c
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 25 deletions.
2 changes: 1 addition & 1 deletion grid-client/deployer/tf_plugin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func NewTFPluginClient(
}

if !cfg.rmbInMemCache {
peerOpts = append(peerOpts, peer.WithTwinCache(10*60*60)) // in seconds that's 10 hours
peerOpts = append(peerOpts, peer.WithTmpCacheExpiration(10*60*60)) // in seconds that's 10 hours
}
rmbClient, err := peer.NewRpcClient(ctx, tfPluginClient.mnemonicOrSeed, manager, peerOpts...)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions rmb-sdk-go/peer/examples/peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func app() error {
relayCallback,
peer.WithRelay("wss://relay.dev.grid.tf"),
peer.WithSession("test-client"),
peer.WithInMemoryExpiration(6*60*60), // six hours
)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rmb-sdk-go/peer/examples/peer_pingmany/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
handler,
peer.WithKeyType(peer.KeyTypeSr25519),
peer.WithSession("rmb-playground999"),
peer.WithTwinCache(10*60*60), // in seconds that's 10 hours
peer.WithInMemoryExpiration(10*60*60), // in seconds that's 10 hours
)

if err != nil {
Expand Down
15 changes: 12 additions & 3 deletions rmb-sdk-go/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,24 @@ func WithEncoder(encoder encoder.Encoder) PeerOpt {
}

// WithTwinCache cache twin information for this ttl number of seconds
// by default twins are cached in memory forever
func WithTwinCache(ttl uint64) PeerOpt {
// if ttl == 0, twins are cached forever
func WithTmpCacheExpiration(ttl uint64) PeerOpt {
return func(pc *peerCfg) {
pc.cacheFactory = func(inner TwinDB, chainURL string) (TwinDB, error) {
return newTmpCache(ttl, inner, chainURL)
}
}
}

// if ttl == 0 twins are cached forever
func WithInMemoryExpiration(ttl uint64) PeerOpt {
return func(pc *peerCfg) {
pc.cacheFactory = func(inner TwinDB, chainURL string) (TwinDB, error) {
return newInMemoryCache(inner, ttl), nil
}
}
}

// Peer exposes the functionality to talk directly to an rmb relay
type Peer struct {
source *types.Address
Expand Down Expand Up @@ -158,7 +167,7 @@ func NewPeer(
enableEncryption: true,
keyType: KeyTypeSr25519,
cacheFactory: func(inner TwinDB, _ string) (TwinDB, error) {
return newInMemoryCache(inner), nil
return newInMemoryCache(inner, 0), nil
},
}

Expand Down
42 changes: 22 additions & 20 deletions rmb-sdk-go/peer/twindb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Twin struct {
PublicKey []byte
Relay *string
E2EKey []byte
Timestamp uint64
}

type twinDB struct {
Expand Down Expand Up @@ -71,32 +72,43 @@ func (t *twinDB) GetByPk(pk []byte) (uint32, error) {
return t.subConn.GetTwinByPubKey(pk)
}

// if ttl == 0, then the data will stay forever
type inMemoryCache struct {
cache map[uint32]Twin
inner TwinDB
m sync.RWMutex
ttl uint64
}

func newInMemoryCache(inner TwinDB) TwinDB {
func newInMemoryCache(inner TwinDB, ttl uint64) TwinDB {
return &inMemoryCache{
cache: make(map[uint32]Twin),
inner: inner,
ttl: ttl,
}
}

func (twin *Twin) isExpired(ttl uint64) bool {
age := uint64(time.Now().Unix()) - twin.Timestamp
if ttl != 0 && age > ttl {
log.Trace().Uint64("age", age).Msg("twin cache hit but expired")
return true
}
return false
}

func (m *inMemoryCache) Get(id uint32) (twin Twin, err error) {
m.m.RLock()
twin, ok := m.cache[id]
m.m.RUnlock()
if ok {
if ok && !twin.isExpired(m.ttl) {
return twin, nil
}

twin, err = m.inner.Get(id)
if err != nil {
return Twin{}, errors.Wrapf(err, "could not get twin with id %d", id)
}

twin.Timestamp = uint64(time.Now().Unix())
m.m.Lock()
m.cache[id] = twin
m.m.Unlock()
Expand All @@ -108,11 +120,6 @@ func (m *inMemoryCache) GetByPk(pk []byte) (uint32, error) {
return m.inner.GetByPk(pk)
}

type cachedTwin struct {
Twin
Timestamp uint64
}

type tmpCache struct {
base string
ttl uint64
Expand All @@ -136,7 +143,7 @@ func newTmpCache(ttl uint64, inner TwinDB, chainURL string) (TwinDB, error) {
}, nil
}

func (r *tmpCache) get(path string) (twin cachedTwin, err error) {
func (r *tmpCache) get(path string) (twin Twin, err error) {
data, err := os.ReadFile(path)

if os.IsNotExist(err) {
Expand All @@ -151,10 +158,7 @@ func (r *tmpCache) get(path string) (twin cachedTwin, err error) {
// crash on file corruption
return twin, errNoCache
}

age := uint64(time.Now().Unix()) - twin.Timestamp
if age > r.ttl {
log.Trace().Uint64("age", age).Msg("twin cache hit but expired")
if twin.isExpired(r.ttl) {
return twin, errNoCache
}

Expand All @@ -163,10 +167,7 @@ func (r *tmpCache) get(path string) (twin cachedTwin, err error) {
}

func (r *tmpCache) set(path string, twin Twin) error {
data, err := json.Marshal(cachedTwin{
Twin: twin,
Timestamp: uint64(time.Now().Unix()),
})
data, err := json.Marshal(twin)

if err != nil {
return err
Expand All @@ -178,13 +179,14 @@ func (r *tmpCache) set(path string, twin Twin) error {
func (r *tmpCache) Get(id uint32) (twin Twin, err error) {
path := filepath.Join(r.base, fmt.Sprint(id))

cached, err := r.get(path)
twin, err = r.get(path)
if err == errNoCache {
twin, err = r.inner.Get(id)
if err != nil {
return twin, err
}
// set cache
twin.Timestamp = uint64(time.Now().Unix())
if err := r.set(path, twin); err != nil {
log.Error().Err(err).Msg("failed to warm up cache")
}
Expand All @@ -193,7 +195,7 @@ func (r *tmpCache) Get(id uint32) (twin Twin, err error) {
return twin, err
}

return cached.Twin, nil
return twin, nil
}

func (r *tmpCache) GetByPk(pk []byte) (uint32, error) {
Expand Down

0 comments on commit f5ce90c

Please # to comment.