Skip to content

Commit

Permalink
refactor caching structs to allow per rpc ttl and reorg ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
dshiell committed Aug 9, 2024
1 parent bfa497a commit 5414c4c
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 76 deletions.
3 changes: 3 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub struct Args {
#[arg(short, long, default_value = "100000")]
pub lru_max_items: usize,

#[arg(short, long, default_value = "12")]
pub reorg_ttl: u32,

#[arg(short, long = "cache", default_value = "lru", value_parser = cache_backend_parser)]
pub cache_type: String,

Expand Down
24 changes: 17 additions & 7 deletions src/cache/lru_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};

use anyhow::Context;
use serde_json::{from_str, Value};
use serde_json::from_str;

use super::{CacheBackend, CacheBackendFactory, CacheStatus};
use super::{CacheBackend, CacheBackendFactory, CacheStatus, CacheValue};

pub struct LruBackendFactory {
data: Arc<Mutex<LruCache<String, String>>>,
reorg_ttl: u32,
}

impl LruBackendFactory {
pub fn new(cap: usize) -> Self {
pub fn new(cap: usize, reorg_ttl: u32) -> Self {
Self {
data: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(cap).unwrap()))),
reorg_ttl,
}
}
}
Expand All @@ -23,12 +25,14 @@ impl CacheBackendFactory for LruBackendFactory {
fn get_instance(&self) -> anyhow::Result<Box<dyn CacheBackend>> {
Ok(Box::new(LruBackend {
data: self.data.clone(),
reorg_ttl: self.reorg_ttl,
}))
}
}

pub struct LruBackend {
data: Arc<Mutex<LruCache<String, String>>>,
reorg_ttl: u32,
}

impl CacheBackend for LruBackend {
Expand All @@ -38,8 +42,8 @@ impl CacheBackend for LruBackend {
let mut lru_cache = self.data.lock().unwrap();
let v = match lru_cache.get(&key) {
Some(value) => {
let value = from_str::<Value>(&value).context("fail to deserialize cache value")?;

let value =
from_str::<CacheValue>(value).context("fail to deserialize cache value")?;
CacheStatus::Cached { key, value }
}

Expand All @@ -49,9 +53,15 @@ impl CacheBackend for LruBackend {
Ok(v)
}

fn write(&mut self, key: &str, value: &str) -> anyhow::Result<()> {
fn write(
&mut self,
key: &str,
cache_value: CacheValue,
expired_value: &Option<CacheValue>,
) -> anyhow::Result<()> {
let mut lru_cache = self.data.lock().unwrap();
let _ = lru_cache.put(key.to_string(), value.to_string());
let cache_value = cache_value.update(expired_value, self.reorg_ttl);
let _ = lru_cache.put(key.to_string(), cache_value.to_string()?);
Ok(())
}
}
23 changes: 17 additions & 6 deletions src/cache/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ use std::sync::Arc;

use anyhow::Context;
use dashmap::DashMap;
use serde_json::{from_str, Value};
use serde_json::from_str;

use super::{CacheBackend, CacheBackendFactory, CacheStatus};
use super::{CacheBackend, CacheBackendFactory, CacheStatus, CacheValue};

pub struct MemoryBackendFactory {
data: Arc<DashMap<String, String>>,
reorg_ttl: u32,
}

impl MemoryBackendFactory {
pub fn new() -> Self {
pub fn new(reorg_ttl: u32) -> Self {
Self {
data: Arc::new(DashMap::new()),
reorg_ttl,
}
}
}
Expand All @@ -22,12 +24,14 @@ impl CacheBackendFactory for MemoryBackendFactory {
fn get_instance(&self) -> anyhow::Result<Box<dyn CacheBackend>> {
Ok(Box::new(MemoryBackend {
data: self.data.clone(),
reorg_ttl: self.reorg_ttl,
}))
}
}

pub struct MemoryBackend {
data: Arc<DashMap<String, String>>,
reorg_ttl: u32,
}

impl CacheBackend for MemoryBackend {
Expand All @@ -36,7 +40,8 @@ impl CacheBackend for MemoryBackend {

let v = match self.data.get(&key) {
Some(value) => {
let value = from_str::<Value>(&value).context("fail to deserialize cache value")?;
let value =
from_str::<CacheValue>(&value).context("fail to deserialize cache value")?;

CacheStatus::Cached { key, value }
}
Expand All @@ -47,8 +52,14 @@ impl CacheBackend for MemoryBackend {
Ok(v)
}

fn write(&mut self, key: &str, value: &str) -> anyhow::Result<()> {
let _ = self.data.insert(key.to_string(), value.to_string());
fn write(
&mut self,
key: &str,
cache_value: CacheValue,
expired_value: &Option<CacheValue>,
) -> anyhow::Result<()> {
let cache_value = cache_value.update(expired_value, self.reorg_ttl);
let _ = self.data.insert(key.to_string(), cache_value.to_string()?);
Ok(())
}
}
84 changes: 82 additions & 2 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,98 @@ pub mod lru_backend;
pub mod memory_backend;
pub mod redis_backend;

use chrono::Local;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::cmp::min;

pub enum CacheStatus {
Cached { key: String, value: Value },
Cached { key: String, value: CacheValue },
Missed { key: String },
}

#[derive(Serialize, Deserialize, Debug)]
pub struct CacheValue {
pub data: Value,
reorg_ttl: u32,
ttl: u32,
last_modified: i64,
}

impl CacheValue {
pub fn new(data: Value, reorg_ttl: u32, ttl: u32) -> Self {
let last_modified = Local::now().timestamp();
Self {
data,
reorg_ttl,
ttl,
last_modified,
}
}

pub fn is_expired(&self) -> bool {
let now = Local::now().timestamp();

let last_modified = self.last_modified;
let reorg_ttl = self.reorg_ttl;
let ttl = self.ttl;

if last_modified > now {
return true;
}

let age: u64 = (now - last_modified) as u64;
let ttl = if reorg_ttl == 0 && ttl == 0 {
0
} else if reorg_ttl == 0 && ttl > 0 {
ttl
} else if reorg_ttl > 0 && ttl == 0 {
reorg_ttl
} else {
min(reorg_ttl, ttl)
};

ttl != 0 && age > ttl.into()
}

pub fn update(mut self, expired_value: &Option<Self>, reorg_ttl: u32) -> Self {
// if a previous entry existed then check if the response has changed
// else this is a new entry and nothing to do
if let Some(expired_value) = expired_value {
let is_new = expired_value.data == self.data;
self.last_modified = Local::now().timestamp();

// if the value has changed then reset the reorg ttl
// else we can exponentially backoff the reorg_ttl
self.reorg_ttl = if is_new {
reorg_ttl
} else {
self.reorg_ttl * 2
};
}

self
}

pub fn to_string(&self) -> anyhow::Result<String> {
Ok(serde_json::to_string(&self)?)
}

pub fn from_str(value: &str) -> anyhow::Result<Self> {
Ok(serde_json::from_str(value)?)
}
}

pub trait CacheBackendFactory: Send + Sync {
fn get_instance(&self) -> anyhow::Result<Box<dyn CacheBackend>>;
}

pub trait CacheBackend {
fn read(&mut self, method: &str, params_key: &str) -> anyhow::Result<CacheStatus>;
fn write(&mut self, key: &str, value: &str) -> anyhow::Result<()>;
fn write(
&mut self,
key: &str,
cache_value: CacheValue,
expired_value: &Option<CacheValue>,
) -> anyhow::Result<()>;
}
28 changes: 21 additions & 7 deletions src/cache/redis_backend.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
use anyhow::Context;
use redis::Commands;
use serde_json::{from_str, Value};
use serde_json::from_str;

use super::{CacheBackend, CacheBackendFactory, CacheStatus};
use super::{CacheBackend, CacheBackendFactory, CacheStatus, CacheValue};

pub struct RedisBackendFactory {
chain_id: u64,
client: r2d2::Pool<redis::Client>,
reorg_ttl: u32,
}

impl RedisBackendFactory {
pub fn new(chain_id: u64, client: r2d2::Pool<redis::Client>) -> Self {
Self { chain_id, client }
pub fn new(chain_id: u64, client: r2d2::Pool<redis::Client>, reorg_ttl: u32) -> Self {
Self {
chain_id,
client,
reorg_ttl,
}
}
}

Expand All @@ -20,13 +25,15 @@ impl CacheBackendFactory for RedisBackendFactory {
Ok(Box::new(RedisBackend {
chain_id: self.chain_id,
conn: self.client.get()?,
reorg_ttl: self.reorg_ttl,
}))
}
}

pub struct RedisBackend {
chain_id: u64,
conn: r2d2::PooledConnection<redis::Client>,
reorg_ttl: u32,
}

impl CacheBackend for RedisBackend {
Expand All @@ -36,7 +43,8 @@ impl CacheBackend for RedisBackend {

let v = match value {
Some(value) => {
let value = from_str::<Value>(&value).context("fail to deserialize cache value")?;
let value =
from_str::<CacheValue>(&value).context("fail to deserialize cache value")?;
CacheStatus::Cached {
key: cache_key,
value,
Expand All @@ -48,8 +56,14 @@ impl CacheBackend for RedisBackend {
Ok(v)
}

fn write(&mut self, key: &str, value: &str) -> anyhow::Result<()> {
let _ = self.conn.set::<_, _, String>(key, value);
fn write(
&mut self,
key: &str,
cache_value: CacheValue,
expired_value: &Option<CacheValue>,
) -> anyhow::Result<()> {
let cache_value = cache_value.update(expired_value, self.reorg_ttl);
let _ = self.conn.set::<_, _, String>(key, cache_value.to_string()?);
Ok(())
}
}
Loading

0 comments on commit 5414c4c

Please # to comment.