• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 11839838253

14 Nov 2024 03:02PM UTC coverage: 75.177%. First build
11839838253

push

github

web-flow
Use iggy byte size where appropriate (#1334)

- introduce `IggyByteSize` where number of bytes are represented
- move `Sizeable` to the SDK and its implementations for a few types
- unfortunately I had to keep a sizeable in server because of the generic instance for `Deref<Target = RetainedMessage>` for `SmartCache`

236 of 277 new or added lines in 53 files covered. (85.2%)

23168 of 30818 relevant lines covered (75.18%)

26327.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.69
/server/src/streaming/cache/memory_tracker.rs
1
extern crate sysinfo;
2

3
use crate::configs::resource_quota::MemoryResourceQuota;
4
use crate::configs::system::CacheConfig;
5
use iggy::utils::byte_size::IggyByteSize;
6
use std::sync::atomic::{AtomicU64, Ordering};
7
use std::sync::{Arc, Once};
8
use sysinfo::System;
9
use tracing::info;
10

11
static ONCE: Once = Once::new();
12
static mut INSTANCE: Option<Arc<CacheMemoryTracker>> = None;
13

14
#[derive(Debug)]
15
pub struct CacheMemoryTracker {
16
    used_memory_bytes: AtomicU64,
17
    limit_bytes: IggyByteSize,
18
}
19

20
type MessageSize = u64;
21

22
impl CacheMemoryTracker {
23
    pub fn initialize(config: &CacheConfig) -> Option<Arc<CacheMemoryTracker>> {
692✔
24
        unsafe {
692✔
25
            ONCE.call_once(|| {
692✔
26
                if config.enabled {
55✔
27
                    INSTANCE = Some(Arc::new(CacheMemoryTracker::new(config.size.clone())));
55✔
28
                    info!("Cache memory tracker initialized");
55✔
29
                } else {
30
                    INSTANCE = None;
×
31
                    info!("Cache memory tracker disabled");
×
32
                }
33
            });
692✔
34
            INSTANCE.clone()
692✔
35
        }
692✔
36
    }
692✔
37

38
    pub fn get_instance() -> Option<Arc<CacheMemoryTracker>> {
29,305✔
39
        unsafe { INSTANCE.clone() }
29,305✔
40
    }
29,305✔
41

42
    fn new(limit: MemoryResourceQuota) -> Self {
55✔
43
        let mut sys = System::new_all();
55✔
44
        sys.refresh_all();
55✔
45

55✔
46
        let total_memory_bytes = IggyByteSize::from(sys.total_memory());
55✔
47
        let free_memory = IggyByteSize::from(sys.free_memory());
55✔
48
        let free_memory_percentage =
55✔
49
            free_memory.as_bytes_u64() as f64 / total_memory_bytes.as_bytes_u64() as f64 * 100.0;
55✔
50
        let used_memory_bytes = AtomicU64::new(0);
55✔
51
        let limit_bytes = limit.into();
55✔
52

55✔
53
        info!(
55✔
54
            "Cache memory tracker started, cache: {}, total memory: {}, free memory: {}, free memory percentage: {:.2}%",
×
NEW
55
            limit_bytes.as_human_string(), total_memory_bytes.as_human_string(), free_memory, free_memory_percentage
×
56
        );
57

58
        CacheMemoryTracker {
55✔
59
            used_memory_bytes,
55✔
60
            limit_bytes,
55✔
61
        }
55✔
62
    }
55✔
63

64
    pub fn increment_used_memory(&self, message_size: MessageSize) {
1,748,149✔
65
        let mut current_cache_size_bytes = self.used_memory_bytes.load(Ordering::SeqCst);
1,748,149✔
66
        loop {
67
            let new_size = current_cache_size_bytes + message_size;
1,776,093✔
68
            match self.used_memory_bytes.compare_exchange_weak(
1,776,093✔
69
                current_cache_size_bytes,
1,776,093✔
70
                new_size,
1,776,093✔
71
                Ordering::SeqCst,
1,776,093✔
72
                Ordering::SeqCst,
1,776,093✔
73
            ) {
1,776,093✔
74
                Ok(_) => break,
1,748,149✔
75
                Err(actual_current) => current_cache_size_bytes = actual_current,
27,944✔
76
            }
77
        }
78
    }
1,748,149✔
79

80
    pub fn decrement_used_memory(&self, message_size: MessageSize) {
145✔
81
        let mut current_cache_size_bytes = self.used_memory_bytes.load(Ordering::SeqCst);
145✔
82
        loop {
83
            let new_size = current_cache_size_bytes - message_size;
145✔
84
            match self.used_memory_bytes.compare_exchange_weak(
145✔
85
                current_cache_size_bytes,
145✔
86
                new_size,
145✔
87
                Ordering::SeqCst,
145✔
88
                Ordering::SeqCst,
145✔
89
            ) {
145✔
90
                Ok(_) => return,
145✔
91
                Err(actual_current) => current_cache_size_bytes = actual_current,
×
92
            }
93
        }
94
    }
145✔
95

NEW
96
    pub fn usage_bytes(&self) -> IggyByteSize {
×
NEW
97
        IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst))
×
98
    }
×
99

100
    pub fn will_fit_into_cache(&self, requested_size: IggyByteSize) -> bool {
487,113✔
101
        IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst)) + requested_size
487,113✔
102
            <= self.limit_bytes
487,113✔
103
    }
487,113✔
104
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc