• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13215398025

08 Feb 2025 12:01PM UTC coverage: 75.624% (-0.09%) from 75.712%
13215398025

push

github

web-flow
Make mimalloc allocator default and adjust message save threshold (#1492)

This commit integrates the mimalloc allocator as the default for both
the bench and server packages, aiming to improve memory allocation
performance. The `messages_required_to_save` configuration in
`server.toml` is reduced from 5000 to 1000 to enhance performance.
Additionally, the server version is bumped from 0.4.152 to 0.4.153 to
reflect these changes.

24956 of 33000 relevant lines covered (75.62%)

9942.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.14
/server/src/streaming/batching/batch_accumulator.rs
1
use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD};
2
use crate::streaming::local_sizeable::LocalSizeable;
3
use crate::streaming::models::messages::RetainedMessage;
4
use bytes::BytesMut;
5
use iggy::utils::byte_size::IggyByteSize;
6
use iggy::utils::sizeable::Sizeable;
7
use std::sync::Arc;
8

9
#[derive(Debug)]
10
pub struct BatchAccumulator {
11
    base_offset: u64,
12
    current_size: IggyByteSize,
13
    current_offset: u64,
14
    current_timestamp: u64,
15
    capacity: u64,
16
    messages: Vec<Arc<RetainedMessage>>,
17
}
18

19
impl BatchAccumulator {
20
    pub fn new(base_offset: u64, capacity: usize) -> Self {
753✔
21
        Self {
753✔
22
            base_offset,
753✔
23
            current_size: IggyByteSize::from(0),
753✔
24
            current_offset: 0,
753✔
25
            current_timestamp: 0,
753✔
26
            capacity: capacity as u64,
753✔
27
            messages: Vec::with_capacity(capacity),
753✔
28
        }
753✔
29
    }
753✔
30

31
    pub fn append(&mut self, batch_size: IggyByteSize, items: &[Arc<RetainedMessage>]) {
23,833✔
32
        assert!(!items.is_empty());
23,833✔
33
        self.current_size += batch_size;
23,833✔
34
        self.current_offset = items.last().unwrap().offset;
23,833✔
35
        self.current_timestamp = items.last().unwrap().timestamp;
23,833✔
36
        self.messages.extend(items.iter().cloned());
23,833✔
37
    }
23,833✔
38

39
    pub fn get_messages_by_offset(
×
40
        &self,
×
41
        start_offset: u64,
×
42
        end_offset: u64,
×
43
    ) -> Vec<Arc<RetainedMessage>> {
×
44
        self.messages
×
45
            .iter()
×
46
            .filter(|msg| msg.offset >= start_offset && msg.offset <= end_offset)
×
47
            .cloned()
×
48
            .collect()
×
49
    }
×
50

51
    pub fn is_empty(&self) -> bool {
609✔
52
        self.messages.is_empty()
609✔
53
    }
609✔
54

55
    pub fn unsaved_messages_count(&self) -> usize {
600✔
56
        self.messages.len()
600✔
57
    }
600✔
58

59
    pub fn batch_max_offset(&self) -> u64 {
24,433✔
60
        self.current_offset
24,433✔
61
    }
24,433✔
62

63
    pub fn batch_max_timestamp(&self) -> u64 {
600✔
64
        self.current_timestamp
600✔
65
    }
600✔
66

67
    pub fn batch_base_offset(&self) -> u64 {
×
68
        self.base_offset
×
69
    }
×
70

71
    pub fn materialize_batch_and_maybe_update_state(&mut self) -> (bool, RetainedMessageBatch) {
600✔
72
        let batch_base_offset = self.base_offset;
600✔
73
        let batch_last_offset_delta = (self.current_offset - self.base_offset) as u32;
600✔
74
        let split_point = std::cmp::min(self.capacity as usize, self.messages.len());
600✔
75

600✔
76
        let mut bytes = BytesMut::with_capacity(self.current_size.as_bytes_u64() as usize);
600✔
77
        let last_batch_timestamp = self
600✔
78
            .messages
600✔
79
            .get(split_point - 1)
600✔
80
            .map_or(0, |msg| msg.timestamp);
600✔
81
        for message in self.messages.drain(..split_point) {
589,335✔
82
            message.extend(&mut bytes);
589,335✔
83
        }
589,335✔
84

85
        let has_remainder = !self.messages.is_empty();
600✔
86
        if has_remainder {
600✔
87
            self.base_offset = self.messages.first().unwrap().offset;
×
88
            self.current_size = self
×
89
                .messages
×
90
                .iter()
×
91
                .map(|msg| msg.get_size_bytes())
×
92
                .sum::<IggyByteSize>();
×
93
            self.current_offset = self.messages.last().unwrap().offset;
×
94
            self.current_timestamp = self.messages.last().unwrap().timestamp;
×
95
        }
600✔
96

97
        let batch_payload = bytes.freeze();
600✔
98
        let batch_payload_len = IggyByteSize::from(batch_payload.len() as u64);
600✔
99
        let batch = RetainedMessageBatch::new(
600✔
100
            batch_base_offset,
600✔
101
            batch_last_offset_delta,
600✔
102
            last_batch_timestamp,
600✔
103
            batch_payload_len,
600✔
104
            batch_payload,
600✔
105
        );
600✔
106
        (has_remainder, batch)
600✔
107
    }
600✔
108
}
109

110
impl Sizeable for BatchAccumulator {
111
    fn get_size_bytes(&self) -> IggyByteSize {
×
112
        self.current_size + RETAINED_BATCH_OVERHEAD.into()
×
113
    }
×
114
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc