• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13335401714

14 Feb 2025 06:42PM UTC coverage: 74.723%. First build
13335401714

Pull #1532

github

web-flow
Merge e23280ea7 into bd4ab90bd
Pull Request #1532: fix(server): don't panic during segment deletion, improve max topic size

58 of 72 new or added lines in 5 files covered. (80.56%)

25248 of 33789 relevant lines covered (74.72%)

10277.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.86
/server/src/streaming/segments/segment.rs
1
use super::indexes::*;
2
use super::logs::*;
3
use crate::configs::system::SystemConfig;
4
use crate::streaming::batching::batch_accumulator::BatchAccumulator;
5
use crate::streaming::segments::*;
6
use error_set::ErrContext;
7
use iggy::error::IggyError;
8
use iggy::utils::byte_size::IggyByteSize;
9
use iggy::utils::expiry::IggyExpiry;
10
use iggy::utils::timestamp::IggyTimestamp;
11
use std::sync::atomic::{AtomicU64, Ordering};
12
use std::sync::Arc;
13
use tokio::fs::remove_file;
14
use tracing::{info, warn};
15

16
#[derive(Debug)]
17
pub struct Segment {
18
    pub stream_id: u32,
19
    pub topic_id: u32,
20
    pub partition_id: u32,
21
    pub start_offset: u64,
22
    pub end_offset: u64,
23
    pub current_offset: u64,
24
    pub index_path: String,
25
    pub log_path: String,
26
    pub size_bytes: IggyByteSize,
27
    pub last_index_position: u32,
28
    pub max_size_bytes: IggyByteSize,
29
    pub size_of_parent_stream: Arc<AtomicU64>,
30
    pub size_of_parent_topic: Arc<AtomicU64>,
31
    pub size_of_parent_partition: Arc<AtomicU64>,
32
    pub messages_count_of_parent_stream: Arc<AtomicU64>,
33
    pub messages_count_of_parent_topic: Arc<AtomicU64>,
34
    pub messages_count_of_parent_partition: Arc<AtomicU64>,
35
    pub is_closed: bool,
36
    pub log_writer: Option<SegmentLogWriter>,
37
    pub log_reader: Option<SegmentLogReader>,
38
    pub index_writer: Option<SegmentIndexWriter>,
39
    pub index_reader: Option<SegmentIndexReader>,
40
    pub message_expiry: IggyExpiry,
41
    pub unsaved_messages: Option<BatchAccumulator>,
42
    pub config: Arc<SystemConfig>,
43
    pub indexes: Option<Vec<Index>>,
44
    pub log_size_bytes: Arc<AtomicU64>,
45
    pub index_size_bytes: Arc<AtomicU64>,
46
}
47

48
impl Segment {
49
    #[allow(clippy::too_many_arguments)]
50
    pub fn create(
876✔
51
        stream_id: u32,
876✔
52
        topic_id: u32,
876✔
53
        partition_id: u32,
876✔
54
        start_offset: u64,
876✔
55
        config: Arc<SystemConfig>,
876✔
56
        message_expiry: IggyExpiry,
876✔
57
        size_of_parent_stream: Arc<AtomicU64>,
876✔
58
        size_of_parent_topic: Arc<AtomicU64>,
876✔
59
        size_of_parent_partition: Arc<AtomicU64>,
876✔
60
        messages_count_of_parent_stream: Arc<AtomicU64>,
876✔
61
        messages_count_of_parent_topic: Arc<AtomicU64>,
876✔
62
        messages_count_of_parent_partition: Arc<AtomicU64>,
876✔
63
    ) -> Segment {
876✔
64
        let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
876✔
65
        let log_path = Self::get_log_path(&path);
876✔
66
        let index_path = Self::get_index_path(&path);
876✔
67
        let message_expiry = match message_expiry {
876✔
68
            IggyExpiry::ServerDefault => config.segment.message_expiry,
×
69
            _ => message_expiry,
876✔
70
        };
71
        let indexes = match config.segment.cache_indexes {
876✔
72
            true => Some(Vec::new()),
875✔
73
            false => None,
1✔
74
        };
75

76
        Segment {
876✔
77
            stream_id,
876✔
78
            topic_id,
876✔
79
            partition_id,
876✔
80
            start_offset,
876✔
81
            end_offset: 0,
876✔
82
            current_offset: start_offset,
876✔
83
            log_path,
876✔
84
            index_path,
876✔
85
            size_bytes: IggyByteSize::from(0),
876✔
86
            last_index_position: 0,
876✔
87
            max_size_bytes: config.segment.size,
876✔
88
            message_expiry,
876✔
89
            indexes,
876✔
90
            unsaved_messages: None,
876✔
91
            is_closed: false,
876✔
92
            log_writer: None,
876✔
93
            log_reader: None,
876✔
94
            index_writer: None,
876✔
95
            index_reader: None,
876✔
96
            size_of_parent_stream,
876✔
97
            size_of_parent_partition,
876✔
98
            size_of_parent_topic,
876✔
99
            messages_count_of_parent_stream,
876✔
100
            messages_count_of_parent_topic,
876✔
101
            messages_count_of_parent_partition,
876✔
102
            config,
876✔
103
            log_size_bytes: Arc::new(AtomicU64::new(0)),
876✔
104
            index_size_bytes: Arc::new(AtomicU64::new(0)),
876✔
105
        }
876✔
106
    }
876✔
107

108
    /// Load the segment state from disk.
109
    pub async fn load_from_disk(&mut self) -> Result<(), IggyError> {
57✔
110
        info!(
57✔
111
            "Loading segment from disk: log_path: {}, index_path: {}",
×
112
            self.log_path, self.index_path
113
        );
114

115
        if self.log_reader.is_none() || self.index_reader.is_none() {
57✔
116
            self.initialize_writing().await?;
57✔
117
            self.initialize_reading().await?;
57✔
118
        }
×
119

120
        let log_size_bytes = self.log_size_bytes.load(Ordering::Acquire);
57✔
121
        info!("Log file size: {}", IggyByteSize::from(log_size_bytes));
57✔
122

123
        // TODO(hubcio): in future, remove size_bytes and use only atomic log_size_bytes everywhere
124
        self.size_bytes = IggyByteSize::from(log_size_bytes);
57✔
125
        self.last_index_position = log_size_bytes as _;
57✔
126

57✔
127
        if self.config.segment.cache_indexes {
57✔
128
            self.indexes = Some(
57✔
129
                self.index_reader
57✔
130
                    .as_ref()
57✔
131
                    .unwrap()
57✔
132
                    .load_all_indexes_impl()
57✔
133
                    .await
57✔
134
                    .with_error_context(|e| {
57✔
135
                        format!("Failed to load indexes, error: {e} for {}", self)
×
136
                    })
57✔
137
                    .map_err(|_| IggyError::CannotReadFile)?,
57✔
138
            );
139

140
            let last_index_offset = if self.indexes.as_ref().unwrap().is_empty() {
57✔
141
                0_u64
46✔
142
            } else {
143
                self.indexes.as_ref().unwrap().last().unwrap().offset as u64
11✔
144
            };
145

146
            self.current_offset = self.start_offset + last_index_offset;
57✔
147

57✔
148
            info!(
57✔
149
                "Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
×
150
                self.indexes.as_ref().unwrap().len(),
×
151
                self.start_offset,
152
                self.partition_id,
153
                self.topic_id,
154
                self.stream_id
155
            );
156
        }
×
157

158
        if self.is_full().await {
57✔
159
            self.is_closed = true;
×
160
        }
57✔
161

162
        let messages_count = self.get_messages_count();
57✔
163

57✔
164
        info!(
57✔
165
            "Loaded segment with log file of size {} ({} messages) for start offset {}, current offset: {}, and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
×
166
            IggyByteSize::from(log_size_bytes),
×
167
            messages_count,
168
            self.start_offset,
169
            self.current_offset,
170
            self.partition_id,
171
            self.topic_id,
172
            self.stream_id
173
        );
174

175
        self.size_of_parent_stream
57✔
176
            .fetch_add(log_size_bytes, Ordering::SeqCst);
57✔
177
        self.size_of_parent_topic
57✔
178
            .fetch_add(log_size_bytes, Ordering::SeqCst);
57✔
179
        self.size_of_parent_partition
57✔
180
            .fetch_add(log_size_bytes, Ordering::SeqCst);
57✔
181
        self.messages_count_of_parent_stream
57✔
182
            .fetch_add(messages_count, Ordering::SeqCst);
57✔
183
        self.messages_count_of_parent_topic
57✔
184
            .fetch_add(messages_count, Ordering::SeqCst);
57✔
185
        self.messages_count_of_parent_partition
57✔
186
            .fetch_add(messages_count, Ordering::SeqCst);
57✔
187

57✔
188
        Ok(())
57✔
189
    }
57✔
190

191
    /// Save the segment state to disk.
192
    pub async fn persist(&mut self) -> Result<(), IggyError> {
800✔
193
        info!("Saving segment with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
800✔
194
            self.start_offset, self.partition_id, self.topic_id, self.stream_id);
195
        self.initialize_writing().await?;
800✔
196
        self.initialize_reading().await?;
800✔
197
        info!("Saved segment log file with start offset: {} for partition with ID: {} for topic with ID: {} and stream with ID: {}",
800✔
198
            self.start_offset, self.partition_id, self.topic_id, self.stream_id);
199
        Ok(())
800✔
200
    }
800✔
201

202
    pub async fn initialize_writing(&mut self) -> Result<(), IggyError> {
857✔
203
        // TODO(hubcio): consider splitting enforce_fsync for index/log to separate entries in config
857✔
204
        let log_fsync = self.config.partition.enforce_fsync;
857✔
205
        let index_fsync = self.config.partition.enforce_fsync;
857✔
206

857✔
207
        let server_confirmation = self.config.segment.server_confirmation;
857✔
208
        let max_file_operation_retries = self.config.state.max_file_operation_retries;
857✔
209
        let retry_delay = self.config.state.retry_delay;
857✔
210

211
        let log_writer = SegmentLogWriter::new(
857✔
212
            &self.log_path,
857✔
213
            self.log_size_bytes.clone(),
857✔
214
            log_fsync,
857✔
215
            server_confirmation,
857✔
216
            max_file_operation_retries,
857✔
217
            retry_delay,
857✔
218
        )
857✔
219
        .await?;
857✔
220

221
        let index_writer =
857✔
222
            SegmentIndexWriter::new(&self.index_path, self.index_size_bytes.clone(), index_fsync)
857✔
223
                .await?;
857✔
224

225
        self.log_writer = Some(log_writer);
857✔
226
        self.index_writer = Some(index_writer);
857✔
227
        Ok(())
857✔
228
    }
857✔
229

230
    pub async fn initialize_reading(&mut self) -> Result<(), IggyError> {
857✔
231
        let log_reader = SegmentLogReader::new(&self.log_path, self.log_size_bytes.clone()).await?;
857✔
232
        // TODO(hubcio): there is no need to store open fd for reader if we have index cache enabled
233
        let index_reader =
857✔
234
            SegmentIndexReader::new(&self.index_path, self.index_size_bytes.clone()).await?;
857✔
235

236
        self.log_reader = Some(log_reader);
857✔
237
        self.index_reader = Some(index_reader);
857✔
238
        Ok(())
857✔
239
    }
857✔
240

241
    pub async fn is_full(&self) -> bool {
23,895✔
242
        if self.size_bytes >= self.max_size_bytes {
23,895✔
243
            return true;
×
244
        }
23,895✔
245

23,895✔
246
        self.is_expired(IggyTimestamp::now()).await
23,895✔
247
    }
23,895✔
248

249
    pub async fn is_expired(&self, now: IggyTimestamp) -> bool {
23,897✔
250
        if !self.is_closed {
23,897✔
251
            return false;
23,896✔
252
        }
1✔
253

1✔
254
        match self.message_expiry {
1✔
255
            IggyExpiry::NeverExpire => false,
×
256
            IggyExpiry::ServerDefault => false,
×
257
            IggyExpiry::ExpireDuration(expiry) => {
1✔
258
                let last_messages = self.get_messages(self.current_offset, 1).await;
1✔
259
                if last_messages.is_err() {
1✔
260
                    return false;
×
261
                }
1✔
262

1✔
263
                let last_messages = last_messages.unwrap();
1✔
264
                if last_messages.is_empty() {
1✔
265
                    return false;
×
266
                }
1✔
267

1✔
268
                let last_message = &last_messages[0];
1✔
269
                let last_message_timestamp = last_message.timestamp;
1✔
270
                last_message_timestamp + expiry.as_micros() <= now.as_micros()
1✔
271
            }
272
        }
273
    }
23,897✔
274

275
    pub async fn shutdown_reading(&mut self) {
561✔
276
        if let Some(log_reader) = self.log_reader.take() {
561✔
277
            drop(log_reader);
561✔
278
        }
561✔
279
        if let Some(index_reader) = self.index_reader.take() {
561✔
280
            drop(index_reader);
561✔
281
        }
561✔
282
    }
561✔
283

284
    pub async fn shutdown_writing(&mut self) {
561✔
285
        if let Some(log_writer) = self.log_writer.take() {
561✔
286
            tokio::spawn(async move {
561✔
287
                let _ = log_writer.fsync().await;
561✔
288
                log_writer.shutdown_persister_task().await;
561✔
289
            });
561✔
290
        } else {
561✔
NEW
291
            warn!(
×
NEW
292
                "Log writer already closed when calling close() for {}",
×
293
                self
294
            );
295
        }
296

297
        if let Some(index_writer) = self.index_writer.take() {
561✔
298
            tokio::spawn(async move {
561✔
299
                let _ = index_writer.fsync().await;
561✔
300
                drop(index_writer)
561✔
301
            });
561✔
302
        } else {
561✔
NEW
303
            warn!("Index writer already closed when calling close()");
×
304
        }
305
    }
561✔
306

307
    pub async fn delete(&mut self) -> Result<(), IggyError> {
561✔
308
        let segment_size = self.size_bytes;
561✔
309
        let segment_count_of_messages = self.get_messages_count();
561✔
310
        info!(
561✔
NEW
311
            "Deleting segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}...",
×
312
            self.start_offset, self.partition_id, self.stream_id, self.topic_id,
313
        );
314

315
        self.shutdown_reading().await;
561✔
316

317
        if !self.is_closed {
561✔
318
            self.shutdown_writing().await;
561✔
NEW
319
        }
×
320

321
        let _ = remove_file(&self.log_path).await.with_error_context(|e| {
561✔
NEW
322
            format!("Failed to delete log file: {}, error: {e}", self.log_path)
×
323
        });
561✔
324
        let _ = remove_file(&self.index_path).await.with_error_context(|e| {
561✔
NEW
325
            format!(
×
NEW
326
                "Failed to delete index file: {}, error: {e}",
×
NEW
327
                self.index_path
×
NEW
328
            )
×
329
        });
561✔
330

561✔
331
        let segment_size_bytes = self.size_bytes.as_bytes_u64();
561✔
332
        self.size_of_parent_stream
561✔
333
            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
561✔
334
        self.size_of_parent_topic
561✔
335
            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
561✔
336
        self.size_of_parent_partition
561✔
337
            .fetch_sub(segment_size_bytes, Ordering::SeqCst);
561✔
338
        self.messages_count_of_parent_stream
561✔
339
            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
561✔
340
        self.messages_count_of_parent_topic
561✔
341
            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
561✔
342
        self.messages_count_of_parent_partition
561✔
343
            .fetch_sub(segment_count_of_messages, Ordering::SeqCst);
561✔
344

561✔
345
        info!(
561✔
NEW
346
            "Deleted segment of size {segment_size} with start offset: {} for partition with ID: {} for stream with ID: {} and topic with ID: {}.",
×
347
            self.start_offset, self.partition_id, self.stream_id, self.topic_id,
348
        );
349

350
        Ok(())
561✔
351
    }
561✔
352

353
    fn get_log_path(path: &str) -> String {
877✔
354
        format!("{}.{}", path, LOG_EXTENSION)
877✔
355
    }
877✔
356

357
    fn get_index_path(path: &str) -> String {
877✔
358
        format!("{}.{}", path, INDEX_EXTENSION)
877✔
359
    }
877✔
360
}
361

362
impl std::fmt::Display for Segment {
363
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
364
        write!(
×
365
            f,
×
366
            "Segment {{ stream_id: {}, topic_id: {}, partition_id: {}, start_offset: {}, end_offset: {}, current_offset: {}, size_bytes: {}, last_index_position: {}, max_size_bytes: {}, closed: {} }}",
×
367
            self.stream_id,
×
368
            self.topic_id,
×
369
            self.partition_id,
×
370
            self.start_offset,
×
371
            self.end_offset,
×
372
            self.current_offset,
×
373
            self.size_bytes,
×
374
            self.last_index_position,
×
375
            self.max_size_bytes,
×
376
            self.is_closed
×
377
        )
×
378
    }
×
379
}
380

381
#[cfg(test)]
382
mod tests {
383
    use super::*;
384
    use crate::configs::system::SegmentConfig;
385
    use iggy::utils::duration::IggyDuration;
386

387
    #[tokio::test]
388
    async fn should_be_created_given_valid_parameters() {
1✔
389
        let stream_id = 1;
1✔
390
        let topic_id = 2;
1✔
391
        let partition_id = 3;
1✔
392
        let start_offset = 0;
1✔
393
        let config = Arc::new(SystemConfig::default());
1✔
394
        let path = config.get_segment_path(stream_id, topic_id, partition_id, start_offset);
1✔
395
        let log_path = Segment::get_log_path(&path);
1✔
396
        let index_path = Segment::get_index_path(&path);
1✔
397
        let message_expiry = IggyExpiry::ExpireDuration(IggyDuration::from(10));
1✔
398
        let size_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
399
        let size_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
400
        let size_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
401
        let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
402
        let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
403
        let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
404

1✔
405
        let segment = Segment::create(
1✔
406
            stream_id,
1✔
407
            topic_id,
1✔
408
            partition_id,
1✔
409
            start_offset,
1✔
410
            config,
1✔
411
            message_expiry,
1✔
412
            size_of_parent_stream,
1✔
413
            size_of_parent_topic,
1✔
414
            size_of_parent_partition,
1✔
415
            messages_count_of_parent_stream,
1✔
416
            messages_count_of_parent_topic,
1✔
417
            messages_count_of_parent_partition,
1✔
418
        );
1✔
419

1✔
420
        assert_eq!(segment.stream_id, stream_id);
1✔
421
        assert_eq!(segment.topic_id, topic_id);
1✔
422
        assert_eq!(segment.partition_id, partition_id);
1✔
423
        assert_eq!(segment.start_offset, start_offset);
1✔
424
        assert_eq!(segment.current_offset, 0);
1✔
425
        assert_eq!(segment.end_offset, 0);
1✔
426
        assert_eq!(segment.size_bytes, 0);
1✔
427
        assert_eq!(segment.log_path, log_path);
1✔
428
        assert_eq!(segment.index_path, index_path);
1✔
429
        assert_eq!(segment.message_expiry, message_expiry);
1✔
430
        assert!(segment.unsaved_messages.is_none());
1✔
431
        assert!(segment.indexes.is_some());
1✔
432
        assert!(!segment.is_closed);
1✔
433
        assert!(!segment.is_full().await);
1✔
434
    }
1✔
435

436
    #[tokio::test]
437
    async fn should_not_initialize_indexes_cache_when_disabled() {
1✔
438
        let stream_id = 1;
1✔
439
        let topic_id = 2;
1✔
440
        let partition_id = 3;
1✔
441
        let start_offset = 0;
1✔
442
        let config = Arc::new(SystemConfig {
1✔
443
            segment: SegmentConfig {
1✔
444
                cache_indexes: false,
1✔
445
                ..Default::default()
1✔
446
            },
1✔
447
            ..Default::default()
1✔
448
        });
1✔
449
        let message_expiry = IggyExpiry::NeverExpire;
1✔
450
        let size_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
451
        let size_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
452
        let size_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
453
        let messages_count_of_parent_stream = Arc::new(AtomicU64::new(0));
1✔
454
        let messages_count_of_parent_topic = Arc::new(AtomicU64::new(0));
1✔
455
        let messages_count_of_parent_partition = Arc::new(AtomicU64::new(0));
1✔
456

1✔
457
        let segment = Segment::create(
1✔
458
            stream_id,
1✔
459
            topic_id,
1✔
460
            partition_id,
1✔
461
            start_offset,
1✔
462
            config,
1✔
463
            message_expiry,
1✔
464
            size_of_parent_stream,
1✔
465
            size_of_parent_topic,
1✔
466
            size_of_parent_partition,
1✔
467
            messages_count_of_parent_stream,
1✔
468
            messages_count_of_parent_topic,
1✔
469
            messages_count_of_parent_partition,
1✔
470
        );
1✔
471

1✔
472
        assert!(segment.indexes.is_none());
1✔
473
    }
1✔
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc