• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13291978433

12 Feb 2025 06:09PM UTC coverage: 74.767% (-0.4%) from 75.192%
13291978433

Pull #1511

github

web-flow
Merge 95db58d6f into 19db87131
Pull Request #1511: WiP: Refactor Segment storage, fix nasty bug related to page cache

740 of 1255 new or added lines in 29 files covered. (58.96%)

108 existing lines in 11 files now uncovered.

25059 of 33516 relevant lines covered (74.77%)

9795.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.0
/server/src/streaming/batching/message_batch.rs
1
use crate::streaming::batching::batch_filter::BatchItemizer;
2
use crate::streaming::batching::iterator::IntoMessagesIterator;
3
use crate::streaming::models::messages::RetainedMessage;
4
use bytes::{BufMut, Bytes, BytesMut};
5
use iggy::utils::{byte_size::IggyByteSize, sizeable::Sizeable};
6

7
pub const RETAINED_BATCH_OVERHEAD: u64 = 8 + 8 + 4 + 4;
8

9
#[derive(Debug)]
10
pub struct RetainedMessageBatch {
11
    pub base_offset: u64,
12
    pub last_offset_delta: u32,
13
    pub max_timestamp: u64,
14
    pub length: IggyByteSize,
15
    pub bytes: Bytes,
16
}
17

18
impl RetainedMessageBatch {
19
    pub fn new(
685✔
20
        base_offset: u64,
685✔
21
        last_offset_delta: u32,
685✔
22
        max_timestamp: u64,
685✔
23
        length: IggyByteSize,
685✔
24
        bytes: Bytes,
685✔
25
    ) -> Self {
685✔
26
        RetainedMessageBatch {
685✔
27
            base_offset,
685✔
28
            last_offset_delta,
685✔
29
            max_timestamp,
685✔
30
            length,
685✔
31
            bytes,
685✔
32
        }
685✔
33
    }
685✔
34

35
    pub fn is_contained_or_overlapping_within_offset_range(
×
36
        &self,
×
37
        start_offset: u64,
×
38
        end_offset: u64,
×
39
    ) -> bool {
×
40
        (self.base_offset <= end_offset && self.get_last_offset() >= end_offset)
×
41
            || (self.base_offset <= start_offset && self.get_last_offset() <= end_offset)
×
42
            || (self.base_offset <= end_offset && self.get_last_offset() >= start_offset)
×
43
    }
×
44

45
    pub fn get_last_offset(&self) -> u64 {
×
46
        self.base_offset + self.last_offset_delta as u64
×
47
    }
×
48

49
    pub fn extend(&self, bytes: &mut BytesMut) {
600✔
50
        bytes.put_u64_le(self.base_offset);
600✔
51
        bytes.put_u32_le(self.length.as_bytes_u64() as u32);
600✔
52
        bytes.put_u32_le(self.last_offset_delta);
600✔
53
        bytes.put_u64_le(self.max_timestamp);
600✔
54
        bytes.put_slice(&self.bytes);
600✔
55
    }
600✔
56
}
57

58
impl<'a, T, U> BatchItemizer<RetainedMessage, &'a U, T> for T
59
where
60
    T: Iterator<Item = &'a U>,
61
    &'a U: IntoMessagesIterator<Item = RetainedMessage>,
62
{
UNCOV
63
    fn to_messages(self) -> Vec<RetainedMessage> {
×
UNCOV
64
        self.flat_map(|batch| batch.into_messages_iter().collect::<Vec<_>>())
×
UNCOV
65
            .collect()
×
66
    }
×
67

68
    fn to_messages_with_filter<F>(self, messages_count: usize, f: &F) -> Vec<RetainedMessage>
5✔
69
    where
5✔
70
        F: Fn(&RetainedMessage) -> bool,
5✔
71
    {
5✔
72
        self.fold(Vec::with_capacity(messages_count), |mut messages, batch| {
5✔
73
            messages.extend(batch.into_messages_iter().filter(f));
5✔
74
            messages
5✔
75
        })
5✔
76
    }
5✔
77
}
78

79
impl Sizeable for RetainedMessageBatch {
80
    fn get_size_bytes(&self) -> IggyByteSize {
1,200✔
81
        self.length + RETAINED_BATCH_OVERHEAD.into()
1,200✔
82
    }
1,200✔
83
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc