• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13327239093

14 Feb 2025 10:23AM UTC coverage: 74.696%. First build
13327239093

Pull #1529

github

web-flow
Merge 0b1a1b0ac into 683e98e90
Pull Request #1529: refactor(server): improve cache size calculation and memory tracking

26 of 29 new or added lines in 2 files covered. (89.66%)

25230 of 33777 relevant lines covered (74.7%)

10282.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.99
/server/src/streaming/cache/buffer.rs
1
use crate::streaming::local_sizeable::RealSize;
2

3
use super::memory_tracker::CacheMemoryTracker;
4
use atone::Vc;
5
use iggy::utils::byte_size::IggyByteSize;
6
use std::fmt::Debug;
7
use std::ops::Index;
8
use std::sync::{
9
    atomic::{AtomicU64, Ordering},
10
    Arc,
11
};
12

13
#[derive(Debug)]
14
pub struct SmartCache<T: RealSize + Debug> {
15
    buffer: Vc<T>,
16
    memory_tracker: Arc<CacheMemoryTracker>,
17
    current_size: IggyByteSize,
18
    hits: AtomicU64,
19
    misses: AtomicU64,
20
}
21

22
impl<T> SmartCache<T>
23
where
24
    T: RealSize + Clone + Debug,
25
{
26
    pub fn new() -> Self {
676✔
27
        let current_size = IggyByteSize::default();
676✔
28
        let buffer = Vc::new();
676✔
29
        let memory_tracker = CacheMemoryTracker::get_instance().unwrap();
676✔
30

676✔
31
        Self {
676✔
32
            buffer,
676✔
33
            memory_tracker,
676✔
34
            current_size,
676✔
35
            hits: AtomicU64::new(0),
676✔
36
            misses: AtomicU64::new(0),
676✔
37
        }
676✔
38
    }
676✔
39

40
    // Used only for cache validation tests
41
    #[cfg(test)]
42
    pub fn to_vec(&self) -> Vec<T> {
6✔
43
        let mut vec = Vec::with_capacity(self.buffer.len());
6✔
44
        vec.extend(self.buffer.iter().cloned());
6✔
45
        vec
6✔
46
    }
6✔
47

48
    /// Pushes an element to the buffer, and if adding the element would exceed the memory limit,
49
    /// removes the oldest elements until there's enough space for the new element.
50
    /// It's preferred to use `extend` instead of this method.
51
    pub fn push_safe(&mut self, element: T) {
80,000✔
52
        let element_size = element.real_size();
80,000✔
53

54
        while !self.memory_tracker.will_fit_into_cache(element_size) {
80,000✔
55
            if let Some(oldest_element) = self.buffer.pop_front() {
×
NEW
56
                let oldest_size = oldest_element.real_size();
×
57
                self.memory_tracker
×
58
                    .decrement_used_memory(oldest_size.as_bytes_u64());
×
59
                self.current_size -= oldest_size;
×
60
            }
×
61
        }
62

63
        self.memory_tracker
80,000✔
64
            .increment_used_memory(element_size.as_bytes_u64());
80,000✔
65
        self.current_size += element_size;
80,000✔
66
        self.buffer.push_back(element);
80,000✔
67
    }
80,000✔
68

69
    /// Removes the oldest elements until there's enough space for the new element.
70
    pub fn evict_by_size(&mut self, size_to_remove: u64) {
×
71
        let mut removed_size = IggyByteSize::default();
×
72

73
        while let Some(element) = self.buffer.pop_front() {
×
74
            if removed_size >= size_to_remove {
×
75
                break;
×
76
            }
×
NEW
77
            let elem_size = element.real_size();
×
78
            self.memory_tracker
×
79
                .decrement_used_memory(elem_size.as_bytes_u64());
×
80
            self.current_size -= elem_size;
×
81
            removed_size += elem_size;
×
82
        }
83
    }
×
84

85
    pub fn purge(&mut self) {
145✔
86
        self.buffer.clear();
145✔
87
        self.memory_tracker
145✔
88
            .decrement_used_memory(self.current_size.as_bytes_u64());
145✔
89
        self.current_size = IggyByteSize::default();
145✔
90
    }
145✔
91

92
    pub fn is_empty(&self) -> bool {
45,091✔
93
        self.buffer.is_empty()
45,091✔
94
    }
45,091✔
95

96
    pub fn current_size(&self) -> IggyByteSize {
2✔
97
        self.current_size
2✔
98
    }
2✔
99

100
    /// Extends the buffer with the given elements, and always adding the elements,
101
    /// even if it exceeds the memory limit.
102
    pub fn extend(&mut self, elements: impl IntoIterator<Item = T>) {
23,827✔
103
        let elements = elements.into_iter().inspect(|element| {
598,949✔
104
            let element_size = element.real_size();
598,949✔
105
            self.memory_tracker
598,949✔
106
                .increment_used_memory(element_size.as_bytes_u64());
598,949✔
107
            self.current_size += element_size;
598,949✔
108
        });
598,949✔
109
        self.buffer.extend(elements);
23,827✔
110
    }
23,827✔
111

112
    /// Always appends the element into the buffer, even if it exceeds the memory limit.
113
    pub fn append(&mut self, element: T) {
×
NEW
114
        let element_size = element.real_size();
×
115
        self.memory_tracker
×
116
            .increment_used_memory(element_size.as_bytes_u64());
×
117
        self.current_size += element_size;
×
118
        self.buffer.push(element);
×
119
    }
×
120

121
    pub fn iter(&self) -> impl Iterator<Item = &T> {
×
122
        self.buffer.iter()
×
123
    }
×
124

125
    pub fn len(&self) -> usize {
22,519✔
126
        self.buffer.len()
22,519✔
127
    }
22,519✔
128

129
    pub fn get_metrics(&self) -> CacheMetrics {
184✔
130
        let hits = self.hits.load(Ordering::Relaxed);
184✔
131
        let misses = self.misses.load(Ordering::Relaxed);
184✔
132
        let total = hits + misses;
184✔
133
        let hit_ratio = if total > 0 {
184✔
134
            hits as f32 / total as f32
81✔
135
        } else {
136
            0.0
103✔
137
        };
138

139
        CacheMetrics {
184✔
140
            hits,
184✔
141
            misses,
184✔
142
            hit_ratio,
184✔
143
        }
184✔
144
    }
184✔
145

146
    pub fn record_hit(&self) {
22,519✔
147
        self.hits.fetch_add(1, Ordering::Relaxed);
22,519✔
148
    }
22,519✔
149

150
    pub fn record_miss(&self) {
×
151
        self.misses.fetch_add(1, Ordering::Relaxed);
×
152
    }
×
153
}
154

155
impl<T> Index<usize> for SmartCache<T>
156
where
157
    T: RealSize + Clone + Debug,
158
{
159
    type Output = T;
160

161
    fn index(&self, index: usize) -> &Self::Output {
715,629✔
162
        &self.buffer[index]
715,629✔
163
    }
715,629✔
164
}
165

166
impl<T: RealSize + Clone + Debug> Default for SmartCache<T> {
167
    fn default() -> Self {
×
168
        Self::new()
×
169
    }
×
170
}
171

172
#[derive(Debug)]
173
pub struct CacheMetrics {
174
    pub hits: u64,
175
    pub misses: u64,
176
    pub hit_ratio: f32,
177
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc