• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13327405895

14 Feb 2025 10:33AM UTC coverage: 74.693% (-0.5%) from 75.192%
13327405895

push

github

web-flow
refactor(server): improve cache size calculation and memory tracking (#1529)

This commit refactors the cache size calculation by replacing the
`LocalSizeable` trait with a new `RealSize` trait. The `RealSize`
trait provides a more accurate calculation of the memory size of
types, including overhead from containers like `Arc` and `Vec`.

26 of 29 new or added lines in 2 files covered. (89.66%)

566 existing lines in 17 files now uncovered.

25229 of 33777 relevant lines covered (74.69%)

10282.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/streaming/persistence/task.rs
1
use crate::streaming::persistence::COMPONENT;
2
use bytes::Bytes;
3
use error_set::ErrContext;
4
use flume::{unbounded, Receiver, Sender};
5
use iggy::error::IggyError;
6
use std::{sync::Arc, time::Duration};
7
use tokio::task;
8
use tracing::error;
9

10
use super::persister::PersisterKind;
11

12
#[derive(Debug)]
13
pub struct LogPersisterTask {
14
    _sender: Option<Sender<Bytes>>,
15
    _task_handle: Option<task::JoinHandle<()>>,
16
}
17

18
impl LogPersisterTask {
UNCOV
19
    pub fn new(
×
UNCOV
20
        path: String,
×
UNCOV
21
        persister: Arc<PersisterKind>,
×
UNCOV
22
        max_retries: u32,
×
UNCOV
23
        retry_sleep: Duration,
×
UNCOV
24
    ) -> Self {
×
UNCOV
25
        let (sender, receiver): (Sender<Bytes>, Receiver<Bytes>) = unbounded();
×
UNCOV
26

×
UNCOV
27
        let task_handle = task::spawn(async move {
×
28
            loop {
UNCOV
29
                match receiver.recv_async().await {
×
UNCOV
30
                    Ok(data) => {
×
UNCOV
31
                        if let Err(e) = Self::persist_with_retries(
×
UNCOV
32
                            &path,
×
UNCOV
33
                            &persister,
×
UNCOV
34
                            data,
×
UNCOV
35
                            max_retries,
×
UNCOV
36
                            retry_sleep,
×
UNCOV
37
                        )
×
UNCOV
38
                        .await
×
39
                        {
40
                            error!("{COMPONENT} - Final failure to persist data: {}", e);
×
UNCOV
41
                        }
×
42
                    }
UNCOV
43
                    Err(e) => {
×
UNCOV
44
                        error!("{COMPONENT} - Error receiving data from channel: {}", e);
×
UNCOV
45
                        return;
×
UNCOV
46
                    }
×
UNCOV
47
                }
×
UNCOV
48
            }
×
UNCOV
49
        });
×
UNCOV
50

×
UNCOV
51
        LogPersisterTask {
×
UNCOV
52
            _sender: Some(sender),
×
UNCOV
53
            _task_handle: Some(task_handle),
×
UNCOV
54
        }
×
UNCOV
55
    }
×
56

UNCOV
57
    async fn persist_with_retries(
×
UNCOV
58
        path: &str,
×
UNCOV
59
        persister: &Arc<PersisterKind>,
×
UNCOV
60
        data: Bytes,
×
UNCOV
61
        max_retries: u32,
×
UNCOV
62
        retry_sleep: Duration,
×
UNCOV
63
    ) -> Result<(), String> {
×
UNCOV
64
        let mut retries = 0;
×
65

UNCOV
66
        while retries < max_retries {
×
UNCOV
67
            match persister.append(path, &data).await {
×
UNCOV
68
                Ok(_) => return Ok(()),
×
69
                Err(e) => {
×
70
                    error!(
×
71
                        "Could not append to persister (attempt {}): {}",
×
72
                        retries + 1,
×
73
                        e
74
                    );
75
                    retries += 1;
×
76
                    tokio::time::sleep(retry_sleep).await;
×
77
                }
78
            }
79
        }
80

81
        Err(format!(
×
82
            "{COMPONENT} - failed to persist data after {} retries",
×
83
            max_retries
×
84
        ))
×
UNCOV
85
    }
×
86

UNCOV
87
    pub async fn send(&self, data: Bytes) -> Result<(), IggyError> {
×
UNCOV
88
        if let Some(sender) = &self._sender {
×
UNCOV
89
            sender
×
UNCOV
90
                .send_async(data)
×
UNCOV
91
                .await
×
UNCOV
92
                .with_error_context(|err| {
×
93
                    format!("{COMPONENT} - failed to send data to async channel, err: {err}")
×
UNCOV
94
                })
×
UNCOV
95
                .map_err(|_| IggyError::CannotSaveMessagesToSegment)
×
96
        } else {
97
            Err(IggyError::CannotSaveMessagesToSegment)
×
98
        }
UNCOV
99
    }
×
100
}
101

102
impl Drop for LogPersisterTask {
UNCOV
103
    fn drop(&mut self) {
×
UNCOV
104
        self._sender.take();
×
105

UNCOV
106
        if let Some(handle) = self._task_handle.take() {
×
UNCOV
107
            tokio::spawn(async move {
×
UNCOV
108
                if let Err(e) = handle.await {
×
109
                    error!(
×
110
                        "{COMPONENT} - error while shutting down task in Drop: {:?}",
×
111
                        e
112
                    );
UNCOV
113
                }
×
UNCOV
114
            });
×
UNCOV
115
        }
×
UNCOV
116
    }
×
117
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc