• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13335910144

14 Feb 2025 07:17PM UTC coverage: 74.72% (+0.03%) from 74.693%
13335910144

push

github

web-flow
fix(server): don't panic during segment deletion, improve max topic size (#1532)

This commit addresses a panic issue that occurred during segment
deletion and closure by ensuring proper shutdown of reading and writing tasks.

It also improves the handling of maximum topic size by allowing unlimited
size and ensuring that the oldest segments are removed by a background
job when necessary.

Additionally, the logging level for certain operations has been adjusted 
to provide more detailed trace information.

These changes enhance the stability and flexibility of the system
in managing topic segments and their lifecycle.

58 of 72 new or added lines in 5 files covered. (80.56%)

4 existing lines in 3 files now uncovered.

25247 of 33789 relevant lines covered (74.72%)

10277.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.86
/server/src/streaming/segments/segment.rs
1
use super::indexes::*;
2
use super::logs::*;
3
use crate::configs::system::SystemConfig;
4
use crate::streaming::batching::batch_accumulator::BatchAccumulator;
5
use crate::streaming::segments::*;
6
use error_set::ErrContext;
7
use iggy::error::IggyError;
8
use iggy::utils::byte_size::IggyByteSize;
9
use iggy::utils::expiry::IggyExpiry;
10
use iggy::utils::timestamp::IggyTimestamp;
11
use std::sync::atomic::{AtomicU64, Ordering};
12
use std::sync::Arc;
13
use tokio::fs::remove_file;
14
use tracing::{info, warn};
15

16
#[derive(Debug)]
17
pub struct Segment {
18
    pub stream_id: u32,
19
    pub topic_id: u32,
20
    pub partition_id: u32,
21
    pub start_offset: u64,
22
    pub end_offset: u64,
23
    pub current_offset: u64,
24
    pub index_path: String,
25
    pub log_path: String,
26
    pub size_bytes: IggyByteSize,
27
    pub last_index_position: u32,
28
    pub max_size_bytes: IggyByteSize,
29
    pub size_of_parent_stream: Arc<AtomicU64>,
30
    pub size_of_parent_topic: Arc<AtomicU64>,
31
    pub size_of_parent_partition: Arc<AtomicU64>,
32
    pub messages_count_of_parent_stream: Arc<AtomicU64>,
33
    pub messages_count_of_parent_topic: Arc<AtomicU64>,
34
    pub messages_count_of_parent_partition: Arc<AtomicU64>,
35
    pub is_closed: bool,
36
    pub log_writer: Option<SegmentLogWriter>,
37
    pub log_reader: Option<SegmentLogReader>,
38
    pub index_writer: Option<SegmentIndexWriter>,
39
    pub index_reader: Option<SegmentIndexReader>,
40
    pub message_expiry: IggyExpiry,
41
    pub unsaved_messages: Option<BatchAccumulator>,
42
    pub config: Arc<SystemConfig>,
43
    pub indexes: Option<Vec<Index>>,
44
    pub log_size_bytes: Arc<AtomicU64>,
45
    pub index_size_bytes: Arc<AtomicU64>,
46
}
47

48
impl Segment {
49
    #[allow(clippy::too_many_arguments)]
50
    pub fn create(
876✔
51
        stream_id: u32,
876✔
52
        topic_id: u32,
876✔
53
        partition_id: u32,
876✔
54
        start_offset: u64,
876✔
55
        config: Arc<SystemConfig>,
876✔
56
        message_expiry: IggyExpiry,
876✔
57
        size_of_parent_stream: Arc<AtomicU64>,
876✔
58
        size_of_parent_topic: Arc<AtomicU64>,
876✔
59
        size_of_parent_partition: Arc<AtomicU64>,
876✔
60
        messages_count_of_parent_stream: Arc<AtomicU64>,
876✔
61
        messages_count_of_parent_topic: Arc<AtomicU64>,
876✔
62
        messages_count_of_parent_partition: Arc<AtomicU64>,
876✔
63
    ) -> Segment {
876✔
64
        let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
876✔
65
        let log_path = Self::get_log_path(&path);
876✔
66
        let index_path = Self::get_index_path(&path);
876✔
67
        let message_expiry = match message_expiry {
876✔
68
            IggyExpiry::ServerDefault => config.segment.message_expiry,
×
69
            _ => message_expiry,
876✔
70
        };
71
        let indexes = match config.segment.cache_indexes {
876✔
72
            true => Some(Vec::new()),
875✔
73
            false => None,
1✔
74
        };
75

76
        Segment {
876✔
77
            stream_id,
876✔
78
            topic_id,
876✔
79
            partition_id,
876✔
80
            start_offset,
876✔
81
            end_offset: 0,
876✔
82
            current_offset: start_offset,
876✔
83
            log_path,
876✔
84
            index_path,
876✔
85
            size_bytes: IggyByteSize::from(0),
876✔
86
            last_index_position: 0,
876✔
87
            max_size_bytes: config.segment.size,
876✔
88
            message_expiry,
876✔
89
            indexes,
876✔
90
            unsaved_messages: None,
876✔
91
            is_closed: false,
876✔
92
            log_writer: None,
876✔
93
            log_reader: None,
876✔
94
            index_writer: None,
876✔
95
            index_reader: None,
876✔
96
            size_of_parent_stream,
876✔
97
            size_of_parent_partition,
876✔
98
            size_of_parent_topic,
876✔
99
            messages_count_of_parent_stream,
876✔
100
            messages_count_of_parent_topic,
876✔
101
            messages_count_of_parent_partition,
876✔
102
            config,
876✔
103
            log_size_bytes: Arc::new(AtomicU64::new(0)),
876✔
104
            index_size_bytes: Arc::new(AtomicU64::new(0)),
876✔
105
        }
876✔
106
    }
876✔
107

108
    /// Load the segment state from disk.
109
    pub async fn load_from_disk(&mut self) -> Result<(), IggyError> {
57✔
110
        info!(
57✔
111
            "Loading segment from disk: log_path: {}, index_path: {}",
×
112
            self.log_path, self.index_path
113
        );
114

115
        if self.log_reader.is_none() || self.index_reader.is_none() {
57✔
116
            self.initialize_writing().await?;
57✔
117
            self.initialize_reading().await?;
57✔
118
        }
×
119

120
        let log_size_bytes = self.log_size_bytes.load(Ordering::Acquire);
57✔
121
        info!("Log file size: {}", IggyByteSize::from(log_size_bytes));
57✔
122

123
        // TODO(hubcio): in future, remove size_bytes and use only atomic log_size_bytes everywhere
124
        self.size_bytes = IggyByteSize::from(log_size_bytes);
57✔
125
        self.last_index_position = log_size_bytes as _;
57✔
126

57✔
127
        if self.config.segment.cache_indexes {
57✔
128
            self.indexes = Some(
57✔
129
                self.index_reader
57✔
130
                    .as_ref()
57✔
131
                    .unwrap()
57✔
132
                    .load_all_indexes_impl()
57✔
133
                    .await
57✔
134
                    .with_error_context(|e| {
57✔
135
                        format!("Failed to load indexes, error: {e} for {}", self)
×
136
                    })
57✔
137
                    .map_err(|_| IggyError::CannotReadFile)?,
57✔
138
            );
139

140
            let last_index_offset = if self.indexes.as_ref().unwrap().is_empty() {
57✔
141
                0_u64
46✔
142
            } else {
143
                self.indexes.as_ref().unwrap().last().unwrap().offset as u64
11✔
144
            };
145

146
            self.current_offset = self.start_offset + last_index_offset;
57✔
147

57✔
148
            info!(
57✔
149
                "Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
×
150
                self.indexes.as_ref().unwrap().len(),
×
151
                self.start_offset,
152
                self.partition_id,
153
                self.topic_id,
154
                self.stream_id
155
            );
156
        }
×
157

158
        if self.is_full().await {
57✔
159
            self.is_closed = true;
×
160
        }
57✔
161

162
        let messages_count = self.get_messages_count();
57✔
163

57✔
164
        info!(
57✔
165
            "Loaded segment with log file of size {} ({} messages) for start offset {}, current offset: {}, and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
×
166
            IggyByteSize::from(log_size_bytes),
×
167
            messages_count,
168
            self.start_offset,
169
            self.current_offset,
170
            self.partition_id,
171
            self.topic_id,
172
            self.stream_id
173
        );
174

175
        self.size_of_parent_stream
57✔
176
            .fetch_add(log_size_bytes, Ordering::SeqCst);
57✔
177
        self.size_of_parent_topic
57✔
178
            .fetch_add(log_size_bytes, Ordering::SeqCst);
57✔
179
        self.size_of_parent_partition
57✔
180
            .fetch_add(log_size_bytes, Ordering::SeqCst);
57✔
181
        self.messages_count_of_parent_stream
57✔
182
            .fetch_add(messages_count, Ordering::SeqCst);
57✔
183
        self.messages_count_of_parent_topic
57✔
184
            .fetch_add(messages_count, Ordering::SeqCst);
57✔
185
        self.messages_count_of_parent_partition
57✔
186
            .fetch_add(messages_count, Ordering::SeqCst);
57✔
187

57✔
188
        Ok(())
57✔
189
    }
57✔
190

191
    /// Save the segment state to disk.
192
    pub async fn persist(&mut self) -> Result<(), IggyError> {
800✔
193
        info!("Saving segment with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
800✔
194
            self.start_offset, self.partition_id, self.topic_id, self.stream_id);
195
        self.initialize_writing().await?;
800✔
196
        self.initialize_reading().await?;
800✔
197
        info!("Saved segment log file with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
800✔
198
            self.start_offset, self.partition_id, self.topic_id, self.stream_id);
199
        Ok(())
800✔
200
    }
800✔
201

202
    pub async fn initialize_writing(&mut self) -> Result<(), IggyError> {
857✔
203
        // TODO(hubcio): consider splitting enforce_fsync for index/log to separate entries in config
857✔
204
        let log_fsync = self.config.partition.enforce_fsync;
857✔
205
        let index_fsync = self.config.partition.enforce_fsync;
857✔
206

857✔
207
        let server_confirmation = self.config.segment.server_confirmation;
857✔
208
        let max_file_operation_retries = self.config.state.max_file_operation_retries;
857✔
209
        let retry_delay = self.config.state.retry_delay;
857✔
210

211
        let log_writer = SegmentLogWriter::new(
857✔
212
            &self.log_path,
857✔
213
            self.log_size_bytes.clone(),
857✔
214
            log_fsync,
857✔
215
            server_confirmation,
857✔
216
            max_file_operation_retries,
857✔
217
            retry_delay,
857✔
218
        )
857✔
219
        .await?;
857✔
220

221
        let index_writer =
857✔
222
            SegmentIndexWriter::new(&self.index_path, self.index_size_bytes.clone(), index_fsync)
857✔
223
                .await?;
857✔
224

225
        self.log_writer = Some(log_writer);
857✔
226
        self.index_writer = Some(index_writer);
857✔
227
        Ok(())
857✔
228
    }
857✔
229

230
    pub async fn initialize_reading(&mut self) -> Result<(), IggyError> {
857✔
231
        let log_reader = SegmentLogReader::new(&self.log_path, self.log_size_bytes.clone()).await?;
857✔
232
        // TODO(hubcio): there is no need to store open fd for reader if we have index cache enabled
233
        let index_reader =
857✔
234
            SegmentIndexReader::new(&self.index_path, self.index_size_bytes.clone()).await?;
857✔
235

236
        self.log_reader = Some(log_reader);
857✔
237
        self.index_reader = Some(index_reader);
857✔
238
        Ok(())
857✔
239
    }
857✔
240

241
    pub async fn is_full(&self) -> bool {
23,895✔
242
        if self.size_bytes >= self.max_size_bytes {
23,895✔
243
            return true;
×
244
        }
23,895✔
245

23,895✔
246
        self.is_expired(IggyTimestamp::now()).await
23,895✔
247
    }
23,895✔
248

249
    pub async fn is_expired(&self, now: IggyTimestamp) -> bool {
23,897✔
250
        if !self.is_closed {
23,897✔
251
            return false;
23,896✔
252
        }
1✔
253

1✔
254
        match self.message_expiry {
1✔
255
            IggyExpiry::NeverExpire => false,
×
256
            IggyExpiry::ServerDefault => false,
×
257
            IggyExpiry::ExpireDuration(expiry) => {
1✔
258
                let last_messages = self.get_messages(self.current_offset, 1).await;
1✔
259
                if last_messages.is_err() {
1✔
260
                    return false;
×
261
                }
1✔
262

1✔
263
                let last_messages = last_messages.unwrap();
1✔
264
                if last_messages.is_empty() {
1✔
265
                    return false;
×
266
                }
1✔
267

1✔
268
                let last_message = &last_messages[0];
1✔
269
                let last_message_timestamp = last_message.timestamp;
1✔
270
                last_message_timestamp + expiry.as_micros() <= now.as_micros()
1✔
271
            }
272
        }
273
    }
23,897✔
274

275
    pub async fn shutdown_reading(&mut self) {
561✔
276
        if let Some(log_reader) = self.log_reader.take() {
561✔
277
            drop(log_reader);
561✔
278
        }
561✔
279
        if let Some(index_reader) = self.index_reader.take() {
561✔
280
            drop(index_reader);
561✔
281
        }
561✔
282
    }
561✔
283

284
    pub async fn shutdown_writing(&mut self) {
561✔
285
        if let Some(log_writer) = self.log_writer.take() {
561✔
286
            tokio::spawn(async move {
561✔
287
                let _ = log_writer.fsync().await;
561✔
288
                log_writer.shutdown_persister_task().await;
560✔
289
            });
561✔
290
        } else {
561✔
NEW
291
            warn!(
×
NEW
292
                "Log writer already closed when calling close() for {}",
×
293
                self
294
            );
295
        }
296

297
        if let Some(index_writer) = self.index_writer.take() {
561✔
298
            tokio::spawn(async move {
561✔
299
                let _ = index_writer.fsync().await;
561✔
300
                drop(index_writer)
561✔
301
            });
561✔
302
        } else {
561✔
NEW
303
            warn!("Index writer already closed when calling close()");
×
304
        }
305
    }
561✔
306

307
    pub async fn delete(&mut self) -> Result<(), IggyError> {
561✔
308
        let segment_size = self.size_bytes;
561✔
309
        let segment_count_of_messages = self.get_messages_count();
561✔
310
        info!(
561✔
NEW
311
            "Deleting segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}...",
×
312
            self.start_offset, self.partition_id, self.stream_id, self.topic_id,
313
        );
314

315
        self.shutdown_reading().await;
561✔
316

317
        if !self.is_closed {
561✔
318
            self.shutdown_writing().await;
561✔
NEW
319
        }
×
320

321
        let _ = remove_file(&self.log_path).await.with_error_context(|e| {
561✔
NEW
322
            format!("Failed to delete log file: {}, error: {e}", self.log_path)
×
323
        });
561✔
324
        let _ = remove_file(&self.index_path).await.with_error_context(|e| {
561✔
NEW
325
            format!(
×
NEW
326
                "Failed to delete index file: {}, error: {e}",
×
NEW
327
                self.index_path
×
NEW
328
            )
×
329
        });
561✔
330

561✔
331
        let segment_size_bytes = self.size_bytes.as_bytes_u64();
561✔
332
        self.size_of_parent_stream
561✔
333
            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
561✔
334
        self.size_of_parent_topic
561✔
335
            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
561✔
336
        self.size_of_parent_partition
561✔
337
            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
561✔
338
        self.messages_count_of_parent_stream
561✔
339
            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
561✔
340
        self.messages_count_of_parent_topic
561✔
341
            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
561✔
342
        self.messages_count_of_parent_partition
561✔
343
            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
561✔
344

561✔
345
        info!(
561✔
NEW
346
            "Deleted segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}.",
×
347
            self.start_offset, self.partition_id, self.stream_id, self.topic_id,
348
        );
349

350
        Ok(())
561✔
351
    }
561✔
352

353
    fn get_log_path(path: &str) -> String {
877✔
354
        format!("{}.{}", path, LOG_EXTENSION)
877✔
355
    }
877✔
356

357
    fn get_index_path(path: &str) -> String {
877✔
358
        format!("{}.{}", path, INDEX_EXTENSION)
877✔
359
    }
877✔
360
}
361

362
impl std::fmt::Display for Segment {
363
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
364
        write!(
×
365
            f,
×
366
            "Segment {{ stream_id: {}, topic_id: {}, partition_id: {}, start_offset: {}, end_offset: {}, current_offset: {}, size_bytes: {}, last_index_position: {}, max_size_bytes: {}, closed: {} }}",
×
367
            self.stream_id,
×
368
            self.topic_id,
×
369
            self.partition_id,
×
370
            self.start_offset,
×
371
            self.end_offset,
×
372
            self.current_offset,
×
373
            self.size_bytes,
×
374
            self.last_index_position,
×
375
            self.max_size_bytes,
×
376
            self.is_closed
×
377
        )
×
378
    }
×
379
}
380

381
#[cfg(test)]
382
mod tests {
383
    use super::*;
384
    use crate::configs::system::SegmentConfig;
385
    use iggy::utils::duration::IggyDuration;
386

387
    #[tokio::test]
388
    async fn should_be_created_given_valid_parameters() {
1✔
389
        let stream_id = 1;
1✔
390
        let topic_id = 2;
1✔
391
        let partition_id = 3;
1✔
392
        let start_offset = 0;
1✔
393
        let config = Arc::new(SystemConfig::default());
1✔
394
        let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
1✔
395
        let log_path = Segment::get_log_path(&path);
1✔
396
        let index_path = Segment::get_index_path(&path);
1✔
397
        let message_expiry = IggyExpiry::ExpireDuration(IggyDuration::from(10));
1✔
398
        let size_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
399
        let size_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
400
        let size_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
401
        let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
402
        let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
403
        let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
404

1✔
405
        let segment = Segment::create(
1✔
406
            stream_id,
1✔
407
            topic_id,
1✔
408
            partition_id,
1✔
409
            start_offset,
1✔
410
            config,
1✔
411
            message_expiry,
1✔
412
            size_of_parent_stream,
1✔
413
            size_of_parent_topic,
1✔
414
            size_of_parent_partition,
1✔
415
            messages_count_of_parent_stream,
1✔
416
            messages_count_of_parent_topic,
1✔
417
            messages_count_of_parent_partition,
1✔
418
        );
1✔
419

1✔
420
        assert_eq!(segment.stream_id, stream_id);
1✔
421
        assert_eq!(segment.topic_id, topic_id);
1✔
422
        assert_eq!(segment.partition_id, partition_id);
1✔
423
        assert_eq!(segment.start_offset, start_offset);
1✔
424
        assert_eq!(segment.current_offset, 0);
1✔
425
        assert_eq!(segment.end_offset, 0);
1✔
426
        assert_eq!(segment.size_bytes, 0);
1✔
427
        assert_eq!(segment.log_path, log_path);
1✔
428
        assert_eq!(segment.index_path, index_path);
1✔
429
        assert_eq!(segment.message_expiry, message_expiry);
1✔
430
        assert!(segment.unsaved_messages.is_none());
1✔
431
        assert!(segment.indexes.is_some());
1✔
432
        assert!(!segment.is_closed);
1✔
433
        assert!(!segment.is_full().await);
1✔
434
    }
1✔
435

436
    #[tokio::test]
437
    async fn should_not_initialize_indexes_cache_when_disabled() {
1✔
438
        let stream_id = 1;
1✔
439
        let topic_id = 2;
1✔
440
        let partition_id = 3;
1✔
441
        let start_offset = 0;
1✔
442
        let config = Arc::new(SystemConfig {
1✔
443
            segment: SegmentConfig {
1✔
444
                cache_indexes: false,
1✔
445
                ..Default::default()
1✔
446
            },
1✔
447
            ..Default::default()
1✔
448
        });
1✔
449
        let message_expiry = IggyExpiry::NeverExpire;
1✔
450
        let size_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
451
        let size_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
452
        let size_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
453
        let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
454
        let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
455
        let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
456

1✔
457
        let segment = Segment::create(
1✔
458
            stream_id,
1✔
459
            topic_id,
1✔
460
            partition_id,
1✔
461
            start_offset,
1✔
462
            config,
1✔
463
            message_expiry,
1✔
464
            size_of_parent_stream,
1✔
465
            size_of_parent_topic,
1✔
466
            size_of_parent_partition,
1✔
467
            messages_count_of_parent_stream,
1✔
468
            messages_count_of_parent_topic,
1✔
469
            messages_count_of_parent_partition,
1✔
470
        );
1✔
471

1✔
472
        assert!(segment.indexes.is_none());
1✔
473
    }
1✔
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc