• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13335910144

14 Feb 2025 07:17PM UTC coverage: 74.72% (+0.03%) from 74.693%
13335910144

push

github

web-flow
fix(server): don't panic during segment deletion, improve max topic size (#1532)

This commit addresses a panic issue that occurred during segment
deletion and closure by ensuring proper shutdown of reading and writing tasks.

It also improves the handling of maximum topic size by allowing unlimited
size and ensuring that the oldest segments are removed by a background
job when necessary.

Additionally, the logging level for certain operations has been adjusted 
to provide more detailed trace information.

These changes enhance the stability and flexibility of the system
in managing topic segments and their lifecycle.

58 of 72 new or added lines in 5 files covered. (80.56%)

4 existing lines in 3 files now uncovered.

25247 of 33789 relevant lines covered (74.72%)

10277.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.37
/server/src/configs/validators.rs
1
extern crate sysinfo;
2

3
use super::server::{
4
    ArchiverConfig, DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig,
5
    StateMaintenanceConfig, TelemetryConfig,
6
};
7
use super::system::CompressionConfig;
8
use crate::archiver::ArchiverKindType;
9
use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig};
10
use crate::configs::system::{CacheConfig, SegmentConfig};
11
use crate::configs::COMPONENT;
12
use crate::server_error::ConfigError;
13
use crate::streaming::segments::*;
14
use error_set::ErrContext;
15
use iggy::compression::compression_algorithm::CompressionAlgorithm;
16
use iggy::utils::byte_size::IggyByteSize;
17
use iggy::utils::expiry::IggyExpiry;
18
use iggy::utils::topic_size::MaxTopicSize;
19
use iggy::validatable::Validatable;
20
use sysinfo::{Pid, ProcessesToUpdate, System};
21

22
impl Validatable<ConfigError> for ServerConfig {
23
    fn validate(&self) -> Result<(), ConfigError> {
131✔
24
        self.data_maintenance.validate().with_error_context(|_| {
131✔
25
            format!("{COMPONENT} - failed to validate data maintenance config")
×
26
        })?;
131✔
27
        self.personal_access_token
131✔
28
            .validate()
131✔
29
            .with_error_context(|_| {
131✔
30
                format!("{COMPONENT} - failed to validate personal access token config")
×
31
            })?;
131✔
32
        self.system
131✔
33
            .segment
131✔
34
            .validate()
131✔
35
            .with_error_context(|_| format!("{COMPONENT} - failed to validate segment config"))?;
131✔
36
        self.system
131✔
37
            .cache
131✔
38
            .validate()
131✔
39
            .with_error_context(|_| format!("{COMPONENT} - failed to validate cache config"))?;
131✔
40
        self.system.compression.validate().with_error_context(|_| {
131✔
41
            format!("{COMPONENT} - failed to validate compression config")
×
42
        })?;
131✔
43
        self.telemetry
131✔
44
            .validate()
131✔
45
            .with_error_context(|_| format!("{COMPONENT} - failed to validate telemetry config"))?;
131✔
46

47
        let topic_size = match self.system.topic.max_size {
131✔
UNCOV
48
            MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()),
×
49
            MaxTopicSize::Unlimited => Ok(u64::MAX),
131✔
50
            MaxTopicSize::ServerDefault => Err(ConfigError::InvalidConfiguration),
×
51
        }?;
×
52

53
        if let IggyExpiry::ServerDefault = self.system.segment.message_expiry {
131✔
54
            return Err(ConfigError::InvalidConfiguration);
×
55
        }
131✔
56

131✔
57
        if self.http.enabled {
131✔
58
            if let IggyExpiry::ServerDefault = self.http.jwt.access_token_expiry {
131✔
59
                return Err(ConfigError::InvalidConfiguration);
×
60
            }
131✔
61
        }
×
62

63
        if topic_size < self.system.segment.size.as_bytes_u64() {
131✔
64
            return Err(ConfigError::InvalidConfiguration);
×
65
        }
131✔
66

131✔
67
        Ok(())
131✔
68
    }
131✔
69
}
70

71
impl Validatable<ConfigError> for CompressionConfig {
72
    fn validate(&self) -> Result<(), ConfigError> {
131✔
73
        let compression_alg = &self.default_algorithm;
131✔
74
        if *compression_alg != CompressionAlgorithm::None {
131✔
75
            // TODO(numinex): Change this message once server side compression is fully developed.
×
76
            println!(
×
77
                "Server started with server-side compression enabled, using algorithm: {}, this feature is not implemented yet!",
×
78
                compression_alg
×
79
            );
×
80
        }
131✔
81

82
        Ok(())
131✔
83
    }
131✔
84
}
85

86
impl Validatable<ConfigError> for TelemetryConfig {
87
    fn validate(&self) -> Result<(), ConfigError> {
131✔
88
        if !self.enabled {
131✔
89
            return Ok(());
131✔
90
        }
×
91

×
92
        if self.service_name.trim().is_empty() {
×
93
            return Err(ConfigError::InvalidConfiguration);
×
94
        }
×
95

×
96
        if self.logs.endpoint.is_empty() {
×
97
            return Err(ConfigError::InvalidConfiguration);
×
98
        }
×
99

×
100
        if self.traces.endpoint.is_empty() {
×
101
            return Err(ConfigError::InvalidConfiguration);
×
102
        }
×
103

×
104
        Ok(())
×
105
    }
131✔
106
}
107

108
impl Validatable<ConfigError> for CacheConfig {
109
    fn validate(&self) -> Result<(), ConfigError> {
131✔
110
        if !self.enabled {
131✔
111
            println!("Cache configuration -> cache is disabled.");
×
112
            return Ok(());
×
113
        }
131✔
114

131✔
115
        let limit_bytes = self.size.clone().into();
131✔
116
        let mut sys = System::new_all();
131✔
117
        sys.refresh_all();
131✔
118
        sys.refresh_processes(
131✔
119
            ProcessesToUpdate::Some(&[Pid::from_u32(std::process::id())]),
131✔
120
            true,
131✔
121
        );
131✔
122
        let total_memory = sys.total_memory();
131✔
123
        let free_memory = sys.free_memory();
131✔
124
        let cache_percentage = (limit_bytes.as_bytes_u64() as f64 / total_memory as f64) * 100.0;
131✔
125

131✔
126
        let pretty_cache_limit = limit_bytes.as_human_string();
131✔
127
        let pretty_total_memory = IggyByteSize::from(total_memory).as_human_string();
131✔
128
        let pretty_free_memory = IggyByteSize::from(free_memory).as_human_string();
131✔
129

131✔
130
        if limit_bytes > total_memory {
131✔
131
            return Err(ConfigError::CacheConfigValidationFailure);
×
132
        }
131✔
133

131✔
134
        if limit_bytes > (total_memory as f64 * 0.75) as u64 {
131✔
135
            println!(
×
136
                "Cache configuration -> cache size exceeds 75% of total memory. Set to: {} ({:.2}% of total memory: {}).",
×
137
                pretty_cache_limit, cache_percentage, pretty_total_memory
×
138
            );
×
139
        }
131✔
140

141
        println!(
131✔
142
            "Cache configuration -> cache size set to {} ({:.2}% of total memory: {}, free memory: {}).",
131✔
143
            pretty_cache_limit, cache_percentage, pretty_total_memory, pretty_free_memory
131✔
144
        );
131✔
145

131✔
146
        Ok(())
131✔
147
    }
131✔
148
}
149

150
impl Validatable<ConfigError> for SegmentConfig {
151
    fn validate(&self) -> Result<(), ConfigError> {
131✔
152
        if self.size > SEGMENT_MAX_SIZE_BYTES {
131✔
153
            return Err(ConfigError::InvalidConfiguration);
×
154
        }
131✔
155

131✔
156
        Ok(())
131✔
157
    }
131✔
158
}
159

160
impl Validatable<ConfigError> for MessageSaverConfig {
161
    fn validate(&self) -> Result<(), ConfigError> {
×
162
        if self.enabled && self.interval.is_zero() {
×
163
            return Err(ConfigError::InvalidConfiguration);
×
164
        }
×
165

×
166
        Ok(())
×
167
    }
×
168
}
169

170
impl Validatable<ConfigError> for DataMaintenanceConfig {
171
    fn validate(&self) -> Result<(), ConfigError> {
131✔
172
        self.archiver
131✔
173
            .validate()
131✔
174
            .with_error_context(|_| format!("{COMPONENT} - failed to validate archiver config"))?;
131✔
175
        self.messages.validate().with_error_context(|_| {
131✔
176
            format!("{COMPONENT} - failed to validate messages maintenance config")
×
177
        })?;
131✔
178
        self.state.validate().with_error_context(|_| {
131✔
179
            format!("{COMPONENT} - failed to validate state maintenance config")
×
180
        })?;
131✔
181
        Ok(())
131✔
182
    }
131✔
183
}
184

185
impl Validatable<ConfigError> for ArchiverConfig {
186
    fn validate(&self) -> Result<(), ConfigError> {
131✔
187
        if !self.enabled {
131✔
188
            return Ok(());
131✔
189
        }
×
190

×
191
        match self.kind {
×
192
            ArchiverKindType::Disk => {
193
                if self.disk.is_none() {
×
194
                    return Err(ConfigError::InvalidConfiguration);
×
195
                }
×
196

×
197
                let disk = self.disk.as_ref().unwrap();
×
198
                if disk.path.is_empty() {
×
199
                    return Err(ConfigError::InvalidConfiguration);
×
200
                }
×
201
                Ok(())
×
202
            }
203
            ArchiverKindType::S3 => {
204
                if self.s3.is_none() {
×
205
                    return Err(ConfigError::InvalidConfiguration);
×
206
                }
×
207

×
208
                let s3 = self.s3.as_ref().unwrap();
×
209
                if s3.key_id.is_empty() {
×
210
                    return Err(ConfigError::InvalidConfiguration);
×
211
                }
×
212

×
213
                if s3.key_secret.is_empty() {
×
214
                    return Err(ConfigError::InvalidConfiguration);
×
215
                }
×
216

×
217
                if s3.endpoint.is_none() && s3.region.is_none() {
×
218
                    return Err(ConfigError::InvalidConfiguration);
×
219
                }
×
220

×
221
                if s3.endpoint.as_deref().unwrap_or_default().is_empty()
×
222
                    && s3.region.as_deref().unwrap_or_default().is_empty()
×
223
                {
224
                    return Err(ConfigError::InvalidConfiguration);
×
225
                }
×
226

×
227
                if s3.bucket.is_empty() {
×
228
                    return Err(ConfigError::InvalidConfiguration);
×
229
                }
×
230
                Ok(())
×
231
            }
232
        }
233
    }
131✔
234
}
235

236
impl Validatable<ConfigError> for MessagesMaintenanceConfig {
237
    fn validate(&self) -> Result<(), ConfigError> {
131✔
238
        if self.archiver_enabled && self.interval.is_zero() {
131✔
239
            return Err(ConfigError::InvalidConfiguration);
×
240
        }
131✔
241

131✔
242
        Ok(())
131✔
243
    }
131✔
244
}
245

246
impl Validatable<ConfigError> for StateMaintenanceConfig {
247
    fn validate(&self) -> Result<(), ConfigError> {
131✔
248
        if self.archiver_enabled && self.interval.is_zero() {
131✔
249
            return Err(ConfigError::InvalidConfiguration);
×
250
        }
131✔
251

131✔
252
        Ok(())
131✔
253
    }
131✔
254
}
255

256
impl Validatable<ConfigError> for PersonalAccessTokenConfig {
257
    fn validate(&self) -> Result<(), ConfigError> {
131✔
258
        if self.max_tokens_per_user == 0 {
131✔
259
            return Err(ConfigError::InvalidConfiguration);
×
260
        }
131✔
261

131✔
262
        if self.cleaner.enabled && self.cleaner.interval.is_zero() {
131✔
263
            return Err(ConfigError::InvalidConfiguration);
×
264
        }
131✔
265

131✔
266
        Ok(())
131✔
267
    }
131✔
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc