• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / 15441100167

04 Jun 2025 11:28AM UTC coverage: 78.265% (+10.3%) from 67.995%
15441100167

Pull #970

github

web-flow
Merge 8ad457ed6 into 34ad3228b
Pull Request #970: task(rust): Update Rust version to 1.87.0

10140 of 12956 relevant lines covered (78.26%)

158.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.12
/server/src/delta_cache_manager.rs
1
use dashmap::DashMap;
2
use tokio::sync::broadcast;
3
use tracing::error;
4
use unleash_types::client_features::DeltaEvent;
5

6
use crate::delta_cache::DeltaCache;
7

8
#[derive(Debug, Clone)]
9
pub enum DeltaCacheUpdate {
10
    Full(String),     // environment with a newly inserted cache
11
    Update(String),   // environment with an updated delta cache
12
    Deletion(String), // environment removed
13
}
14

15
pub struct DeltaCacheManager {
16
    caches: DashMap<String, DeltaCache>,
17
    update_sender: broadcast::Sender<DeltaCacheUpdate>,
18
}
19

20
impl Default for DeltaCacheManager {
21
    fn default() -> Self {
5✔
22
        Self::new()
5✔
23
    }
5✔
24
}
25

26
impl DeltaCacheManager {
27
    pub fn new() -> Self {
63✔
28
        let (tx, _rx) = broadcast::channel::<DeltaCacheUpdate>(16);
63✔
29
        Self {
63✔
30
            caches: DashMap::new(),
63✔
31
            update_sender: tx,
63✔
32
        }
63✔
33
    }
63✔
34

35
    pub fn subscribe(&self) -> broadcast::Receiver<DeltaCacheUpdate> {
4✔
36
        self.update_sender.subscribe()
4✔
37
    }
4✔
38

39
    pub fn get(&self, env: &str) -> Option<DeltaCache> {
28✔
40
        self.caches.get(env).map(|entry| entry.value().clone())
28✔
41
    }
28✔
42

43
    pub fn insert_cache(&self, env: &str, cache: DeltaCache) {
10✔
44
        self.caches.insert(env.to_string(), cache);
10✔
45
        let _ = self
10✔
46
            .update_sender
10✔
47
            .send(DeltaCacheUpdate::Full(env.to_string()));
10✔
48
    }
10✔
49

50
    pub fn update_cache(&self, env: &str, events: &[DeltaEvent]) {
6✔
51
        if let Some(mut cache) = self.caches.get_mut(env) {
6✔
52
            cache.add_events(events);
6✔
53
            let result = self
6✔
54
                .update_sender
6✔
55
                .send(DeltaCacheUpdate::Update(env.to_string()));
6✔
56
            if let Err(e) = result {
6✔
57
                error!("Unexpected broadcast error: {:#?}", e);
2✔
58
            }
4✔
59
        }
×
60
    }
6✔
61

62
    pub fn remove_cache(&self, env: &str) {
1✔
63
        self.caches.remove(env);
1✔
64
        let _ = self
1✔
65
            .update_sender
1✔
66
            .send(DeltaCacheUpdate::Deletion(env.to_string()));
1✔
67
    }
1✔
68
}
69

70
#[cfg(test)]
71
mod tests {
72
    use super::*;
73
    use crate::delta_cache::{DeltaCache, DeltaHydrationEvent};
74
    use unleash_types::client_features::{ClientFeature, DeltaEvent, Segment};
75

76
    #[test]
77
    fn test_insert_and_update_delta_cache() {
1✔
78
        let hydration = DeltaHydrationEvent {
1✔
79
            event_id: 1,
1✔
80
            features: vec![ClientFeature {
1✔
81
                name: "feature1".to_string(),
1✔
82
                ..Default::default()
1✔
83
            }],
1✔
84
            segments: vec![Segment {
1✔
85
                id: 1,
1✔
86
                constraints: vec![],
1✔
87
            }],
1✔
88
        };
1✔
89
        let max_length = 5;
1✔
90
        let delta_cache = DeltaCache::new(hydration, max_length);
1✔
91
        let delta_cache_manager = DeltaCacheManager::new();
1✔
92
        let env = "test-env";
1✔
93

1✔
94
        let mut rx = delta_cache_manager.subscribe();
1✔
95

1✔
96
        delta_cache_manager.insert_cache(env, delta_cache);
1✔
97

1✔
98
        match rx.try_recv() {
1✔
99
            Ok(DeltaCacheUpdate::Full(e)) => assert_eq!(e, env),
1✔
100
            e => panic!("Expected Full update, got {:?}", e),
×
101
        }
102

103
        let update_event = DeltaEvent::FeatureUpdated {
1✔
104
            event_id: 2,
1✔
105
            feature: ClientFeature {
1✔
106
                name: "feature1_updated".to_string(),
1✔
107
                ..Default::default()
1✔
108
            },
1✔
109
        };
1✔
110

1✔
111
        delta_cache_manager.update_cache(env, &[update_event.clone()]);
1✔
112

1✔
113
        match rx.try_recv() {
1✔
114
            Ok(DeltaCacheUpdate::Update(e)) => assert_eq!(e, env),
1✔
115
            e => panic!("Expected Update update, got {:?}", e),
×
116
        }
117

118
        let cache = delta_cache_manager.get(env).expect("Cache should exist");
1✔
119
        let events = cache.get_events();
1✔
120
        assert_eq!(events.last().unwrap(), &update_event);
1✔
121
    }
1✔
122

123
    #[test]
124
    fn test_remove_delta_cache() {
1✔
125
        let hydration = DeltaHydrationEvent {
1✔
126
            event_id: 1,
1✔
127
            features: vec![ClientFeature {
1✔
128
                name: "feature-a".to_string(),
1✔
129
                ..Default::default()
1✔
130
            }],
1✔
131
            segments: vec![],
1✔
132
        };
1✔
133
        let delta_cache = DeltaCache::new(hydration, 3);
1✔
134
        let delta_cache_manager = DeltaCacheManager::new();
1✔
135
        let env = "remove-env";
1✔
136

1✔
137
        delta_cache_manager.insert_cache(env, delta_cache);
1✔
138
        let mut rx = delta_cache_manager.subscribe();
1✔
139
        let _ = rx.try_recv();
1✔
140

1✔
141
        delta_cache_manager.remove_cache(env);
1✔
142
        match rx.try_recv() {
1✔
143
            Ok(DeltaCacheUpdate::Deletion(e)) => assert_eq!(e, env),
1✔
144
            e => panic!("Expected Deletion update, got {:?}", e),
×
145
        }
146
        assert!(delta_cache_manager.get(env).is_none());
1✔
147
    }
1✔
148
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc