• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 11839838253

14 Nov 2024 03:02PM UTC coverage: 75.177%. First build
11839838253

push

github

web-flow
Use iggy byte size where appropriate (#1334)

- introduce `IggyByteSize` where number of bytes are represented
- move `Sizeable` to the SDK and its implementations for a few types
- unfortunately I had to keep a sizeable in server because of the generic instance for `Deref<Target = RetainedMessage>` for `SmartCache`

236 of 277 new or added lines in 53 files covered. (85.2%)

23168 of 30818 relevant lines covered (75.18%)

26327.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.2
/server/src/streaming/cache/buffer.rs
1
use crate::streaming::local_sizeable::LocalSizeable;
2

3
use super::memory_tracker::CacheMemoryTracker;
4
use atone::Vc;
5
use iggy::utils::byte_size::IggyByteSize;
6
use std::fmt::Debug;
7
use std::ops::Index;
8
use std::sync::Arc;
9

10
#[derive(Debug)]
11
pub struct SmartCache<T: LocalSizeable + Debug> {
12
    current_size: IggyByteSize,
13
    buffer: Vc<T>,
14
    memory_tracker: Arc<CacheMemoryTracker>,
15
}
16

17
impl<T> SmartCache<T>
18
where
19
    T: LocalSizeable + Clone + Debug,
20
{
21
    pub fn new() -> Self {
692✔
22
        let current_size = IggyByteSize::default();
692✔
23
        let buffer = Vc::new();
692✔
24
        let memory_tracker = CacheMemoryTracker::get_instance().unwrap();
692✔
25

692✔
26
        Self {
692✔
27
            current_size,
692✔
28
            buffer,
692✔
29
            memory_tracker,
692✔
30
        }
692✔
31
    }
692✔
32

33
    // Used only for cache validation tests
34
    #[cfg(test)]
35
    pub fn to_vec(&self) -> Vec<T> {
6✔
36
        let mut vec = Vec::with_capacity(self.buffer.len());
6✔
37
        vec.extend(self.buffer.iter().cloned());
6✔
38
        vec
6✔
39
    }
6✔
40

41
    /// Pushes an element to the buffer, and if adding the element would exceed the memory limit,
42
    /// removes the oldest elements until there's enough space for the new element.
43
    /// It's preferred to use `extend` instead of this method.
44
    pub fn push_safe(&mut self, element: T) {
458,500✔
45
        let element_size = element.get_size_bytes();
458,500✔
46

47
        while !self.memory_tracker.will_fit_into_cache(element_size) {
458,500✔
48
            if let Some(oldest_element) = self.buffer.pop_front() {
×
NEW
49
                let oldest_size = oldest_element.get_size_bytes();
×
NEW
50
                self.memory_tracker
×
NEW
51
                    .decrement_used_memory(oldest_size.as_bytes_u64());
×
52
                self.current_size -= oldest_size;
×
53
            }
×
54
        }
55

56
        self.memory_tracker
458,500✔
57
            .increment_used_memory(element_size.as_bytes_u64());
458,500✔
58
        self.current_size += element_size;
458,500✔
59
        self.buffer.push_back(element);
458,500✔
60
    }
458,500✔
61

62
    /// Removes the oldest elements until there's enough space for the new element.
63
    pub fn evict_by_size(&mut self, size_to_remove: u64) {
×
NEW
64
        let mut removed_size = IggyByteSize::default();
×
65

66
        while let Some(element) = self.buffer.pop_front() {
×
67
            if removed_size >= size_to_remove {
×
68
                break;
×
69
            }
×
NEW
70
            let elem_size = element.get_size_bytes();
×
NEW
71
            self.memory_tracker
×
NEW
72
                .decrement_used_memory(elem_size.as_bytes_u64());
×
73
            self.current_size -= elem_size;
×
74
            removed_size += elem_size;
×
75
        }
76
    }
×
77

78
    pub fn purge(&mut self) {
145✔
79
        self.buffer.clear();
145✔
80
        self.memory_tracker
145✔
81
            .decrement_used_memory(self.current_size.as_bytes_u64());
145✔
82
        self.current_size = IggyByteSize::default();
145✔
83
    }
145✔
84

85
    pub fn is_empty(&self) -> bool {
74,737✔
86
        self.buffer.is_empty()
74,737✔
87
    }
74,737✔
88

89
    pub fn current_size(&self) -> IggyByteSize {
2✔
90
        self.current_size
2✔
91
    }
2✔
92

93
    /// Extends the buffer with the given elements, and always adding the elements,
94
    /// even if it exceeds the memory limit.
95
    pub fn extend(&mut self, elements: impl IntoIterator<Item = T>) {
30,734✔
96
        let elements = elements.into_iter().inspect(|element| {
1,289,649✔
97
            let element_size = element.get_size_bytes();
1,289,649✔
98
            self.memory_tracker
1,289,649✔
99
                .increment_used_memory(element_size.as_bytes_u64());
1,289,649✔
100
            self.current_size += element_size;
1,289,649✔
101
        });
1,289,649✔
102
        self.buffer.extend(elements);
30,734✔
103
    }
30,734✔
104

105
    /// Always appends the element into the buffer, even if it exceeds the memory limit.
106
    pub fn append(&mut self, element: T) {
×
NEW
107
        let element_size = element.get_size_bytes();
×
NEW
108
        self.memory_tracker
×
NEW
109
            .increment_used_memory(element_size.as_bytes_u64());
×
110
        self.current_size += element_size;
×
111
        self.buffer.push(element);
×
112
    }
×
113

114
    pub fn iter(&self) -> impl Iterator<Item = &T> {
×
115
        self.buffer.iter()
×
116
    }
×
117

118
    pub fn len(&self) -> usize {
37,342✔
119
        self.buffer.len()
37,342✔
120
    }
37,342✔
121
}
122

123
impl<T> Index<usize> for SmartCache<T>
124
where
125
    T: LocalSizeable + Clone + Debug,
126
{
127
    type Output = T;
128

129
    fn index(&self, index: usize) -> &Self::Output {
2,227,575✔
130
        &self.buffer[index]
2,227,575✔
131
    }
2,227,575✔
132
}
133

134
impl<T: LocalSizeable + Clone + Debug> Default for SmartCache<T> {
135
    fn default() -> Self {
×
136
        Self::new()
×
137
    }
×
138
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc