• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13295228560

12 Feb 2025 09:28PM UTC coverage: 74.822% (-0.4%) from 75.192%
13295228560

Pull #1511

github

web-flow
Merge 20a5669f2 into 19db87131
Pull Request #1511: Refactor segment handling and fix page cache issue

839 of 1367 new or added lines in 29 files covered. (61.38%)

109 existing lines in 11 files now uncovered.

25171 of 33641 relevant lines covered (74.82%)

9759.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.66
/server/src/streaming/batching/message_batch.rs
1
use crate::streaming::batching::batch_filter::BatchItemizer;
2
use crate::streaming::batching::iterator::IntoMessagesIterator;
3
use crate::streaming::models::messages::RetainedMessage;
4
use bytes::{BufMut, Bytes, BytesMut};
5
use iggy::utils::{byte_size::IggyByteSize, sizeable::Sizeable};
6

7
pub const RETAINED_BATCH_OVERHEAD: u64 = 8 + 8 + 4 + 4;
8

9
#[derive(Debug)]
10
pub struct RetainedMessageBatch {
11
    pub base_offset: u64,
12
    pub last_offset_delta: u32,
13
    pub max_timestamp: u64,
14
    pub length: IggyByteSize,
15
    pub bytes: Bytes,
16
}
17

18
impl RetainedMessageBatch {
19
    pub fn new(
685✔
20
        base_offset: u64,
685✔
21
        last_offset_delta: u32,
685✔
22
        max_timestamp: u64,
685✔
23
        length: IggyByteSize,
685✔
24
        bytes: Bytes,
685✔
25
    ) -> Self {
685✔
26
        RetainedMessageBatch {
685✔
27
            base_offset,
685✔
28
            last_offset_delta,
685✔
29
            max_timestamp,
685✔
30
            length,
685✔
31
            bytes,
685✔
32
        }
685✔
33
    }
685✔
34

35
    pub fn is_contained_or_overlapping_within_offset_range(
×
36
        &self,
×
37
        start_offset: u64,
×
38
        end_offset: u64,
×
39
    ) -> bool {
×
40
        (self.base_offset <= end_offset && self.get_last_offset() >= end_offset)
×
41
            || (self.base_offset <= start_offset && self.get_last_offset() <= end_offset)
×
42
            || (self.base_offset <= end_offset && self.get_last_offset() >= start_offset)
×
43
    }
×
44

45
    pub fn get_last_offset(&self) -> u64 {
×
46
        self.base_offset + self.last_offset_delta as u64
×
47
    }
×
48

NEW
49
    pub fn extend(&self, bytes: &mut BytesMut) {
×
NEW
50
        // use vectored API
×
NEW
51
        bytes.put_u64_le(self.base_offset);
×
NEW
52
        bytes.put_u32_le(self.length.as_bytes_u64() as u32);
×
NEW
53
        bytes.put_u32_le(self.last_offset_delta);
×
NEW
54
        bytes.put_u64_le(self.max_timestamp);
×
NEW
55
        bytes.put_slice(&self.bytes);
×
NEW
56
    }
×
57

58
    pub fn header_as_bytes(&self) -> [u8; 24] {
600✔
59
        let mut header: [u8; 24] = [0u8; 24];
600✔
60

600✔
61
        header[0..8].copy_from_slice(&self.base_offset.to_le_bytes());
600✔
62
        header[8..12].copy_from_slice(&(self.length.as_bytes_u64() as u32).to_le_bytes());
600✔
63
        header[12..16].copy_from_slice(&self.last_offset_delta.to_le_bytes());
600✔
64
        header[16..24].copy_from_slice(&self.max_timestamp.to_le_bytes());
600✔
65

600✔
66
        header
600✔
67
    }
600✔
68
}
69

70
impl<'a, T, U> BatchItemizer<RetainedMessage, &'a U, T> for T
71
where
72
    T: Iterator<Item = &'a U>,
73
    &'a U: IntoMessagesIterator<Item = RetainedMessage>,
74
{
UNCOV
75
    fn to_messages(self) -> Vec<RetainedMessage> {
×
UNCOV
76
        self.flat_map(|batch| batch.into_messages_iter().collect::<Vec<_>>())
×
UNCOV
77
            .collect()
×
UNCOV
78
    }
×
79

80
    fn to_messages_with_filter<F>(self, messages_count: usize, f: &F) -> Vec<RetainedMessage>
5✔
81
    where
5✔
82
        F: Fn(&RetainedMessage) -> bool,
5✔
83
    {
5✔
84
        self.fold(Vec::with_capacity(messages_count), |mut messages, batch| {
5✔
85
            messages.extend(batch.into_messages_iter().filter(f));
5✔
86
            messages
5✔
87
        })
5✔
88
    }
5✔
89
}
90

91
impl Sizeable for RetainedMessageBatch {
92
    fn get_size_bytes(&self) -> IggyByteSize {
1,200✔
93
        self.length + RETAINED_BATCH_OVERHEAD.into()
1,200✔
94
    }
1,200✔
95
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc