Skip to content

Commit

Permalink
feat: delta cache manager (#760)
Browse files Browse the repository at this point in the history
Co-authored-by: FredrikOseberg <fredrik.no@gmail.com>
  • Loading branch information
kwasniew and FredrikOseberg authored Feb 21, 2025
1 parent dc67abe commit d4f6dc5
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 0 deletions.
142 changes: 142 additions & 0 deletions server/src/delta_cache_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use dashmap::DashMap;
use tokio::sync::broadcast;
use unleash_types::client_features::DeltaEvent;

use crate::delta_cache::DeltaCache;

#[derive(Debug, Clone)]
pub enum DeltaCacheUpdate {
Full(String), // environment with a newly inserted cache
Update(String), // environment with an updated delta cache
Deletion(String), // environment removed
}

pub struct DeltaCacheManager {
caches: DashMap<String, DeltaCache>,
update_sender: broadcast::Sender<DeltaCacheUpdate>,
}

impl Default for DeltaCacheManager {
fn default() -> Self {
Self::new()
}
}

impl DeltaCacheManager {
pub fn new() -> Self {
let (tx, _rx) = broadcast::channel::<DeltaCacheUpdate>(16);
Self {
caches: DashMap::new(),
update_sender: tx,
}
}

pub fn subscribe(&self) -> broadcast::Receiver<DeltaCacheUpdate> {
self.update_sender.subscribe()
}

pub fn get(&self, env: &str) -> Option<DeltaCache> {
self.caches.get(env).map(|entry| entry.value().clone())
}

pub fn insert_cache(&self, env: &str, cache: DeltaCache) {
self.caches.insert(env.to_string(), cache);
let _ = self.update_sender.send(DeltaCacheUpdate::Full(env.to_string()));
}

pub fn update_cache(&self, env: &str, events: &[DeltaEvent]) {
if let Some(mut cache) = self.caches.get_mut(env) {
cache.add_events(events);
let _ = self
.update_sender
.send(DeltaCacheUpdate::Update(env.to_string()));
}
}

pub fn remove_cache(&self, env: &str) {
self.caches.remove(env);
let _ = self
.update_sender
.send(DeltaCacheUpdate::Deletion(env.to_string()));
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::delta_cache::{DeltaCache, DeltaHydrationEvent};
use unleash_types::client_features::{ClientFeature, DeltaEvent, Segment};

#[test]
fn test_insert_and_update_delta_cache() {
let hydration = DeltaHydrationEvent {
event_id: 1,
features: vec![ClientFeature {
name: "feature1".to_string(),
..Default::default()
}],
segments: vec![Segment {
id: 1,
constraints: vec![],
}],
};
let max_length = 5;
let delta_cache = DeltaCache::new(hydration, max_length);
let delta_cache_manager = DeltaCacheManager::new();
let env = "test-env";

let mut rx = delta_cache_manager.subscribe();

delta_cache_manager.insert_cache(env, delta_cache);

match rx.try_recv() {
Ok(DeltaCacheUpdate::Full(e)) => assert_eq!(e, env),
e => panic!("Expected Full update, got {:?}", e),
}

let update_event = DeltaEvent::FeatureUpdated {
event_id: 2,
feature: ClientFeature {
name: "feature1_updated".to_string(),
..Default::default()
},
};

delta_cache_manager.update_cache(env, &[update_event.clone()]);

match rx.try_recv() {
Ok(DeltaCacheUpdate::Update(e)) => assert_eq!(e, env),
e => panic!("Expected Update update, got {:?}", e),
}

let cache = delta_cache_manager.get(env).expect("Cache should exist");
let events = cache.get_events();
assert_eq!(events.last().unwrap(), &update_event);
}

#[test]
fn test_remove_delta_cache() {
let hydration = DeltaHydrationEvent {
event_id: 1,
features: vec![ClientFeature {
name: "feature-a".to_string(),
..Default::default()
}],
segments: vec![],
};
let delta_cache = DeltaCache::new(hydration, 3);
let delta_cache_manager = DeltaCacheManager::new();
let env = "remove-env";

delta_cache_manager.insert_cache(env, delta_cache);
let mut rx = delta_cache_manager.subscribe();
let _ = rx.try_recv();

delta_cache_manager.remove_cache(env);
match rx.try_recv() {
Ok(DeltaCacheUpdate::Deletion(e)) => assert_eq!(e, env),
e => panic!("Expected Deletion update, got {:?}", e),
}
assert!(delta_cache_manager.get(env).is_none());
}
}
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod cli;
pub mod client_api;
pub mod delta_cache;
pub mod delta_filters;
pub mod delta_cache_manager;
pub mod edge_api;
#[cfg(not(tarpaulin_include))]
pub mod error;
Expand Down

0 comments on commit d4f6dc5

Please # to comment.