• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 11652262744

03 Nov 2024 02:43PM UTC coverage: 75.16% (+1.0%) from 74.191%
11652262744

push

github

web-flow
Consolidate Index and TimeIndex into single Index (#1313)

74 of 184 new or added lines in 14 files covered. (40.22%)

9 existing lines in 7 files now uncovered.

22757 of 30278 relevant lines covered (75.16%)

18236.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.62
/server/src/channels/commands/maintain_messages.rs
1
use crate::archiver::Archiver;
2
use crate::channels::server_command::ServerCommand;
3
use crate::configs::server::MessagesMaintenanceConfig;
4
use crate::map_toggle_str;
5
use crate::streaming::systems::system::SharedSystem;
6
use crate::streaming::topics::topic::Topic;
7
use async_trait::async_trait;
8
use flume::Sender;
9
use iggy::error::IggyError;
10
use iggy::locking::IggySharedMutFn;
11
use iggy::utils::duration::IggyDuration;
12
use iggy::utils::timestamp::IggyTimestamp;
13
use std::sync::Arc;
14
use tokio::time;
15
use tracing::{debug, error, info, instrument};
16

17
pub struct MessagesMaintainer {
18
    cleaner_enabled: bool,
19
    archiver_enabled: bool,
20
    interval: IggyDuration,
21
    sender: Sender<MaintainMessagesCommand>,
22
}
23

24
#[derive(Debug, Default, Clone)]
25
pub struct MaintainMessagesCommand {
26
    clean_messages: bool,
27
    archive_messages: bool,
28
}
29

30
#[derive(Debug, Default, Clone)]
31
pub struct MaintainMessagesExecutor;
32

33
impl MessagesMaintainer {
34
    pub fn new(
×
35
        config: &MessagesMaintenanceConfig,
×
36
        sender: Sender<MaintainMessagesCommand>,
×
37
    ) -> Self {
×
38
        Self {
×
39
            cleaner_enabled: config.cleaner_enabled,
×
40
            archiver_enabled: config.archiver_enabled,
×
41
            interval: config.interval,
×
42
            sender,
×
43
        }
×
44
    }
×
45

46
    pub fn start(&self) {
×
47
        if !self.cleaner_enabled && !self.archiver_enabled {
×
48
            info!("Messages maintainer is disabled.");
×
49
            return;
×
50
        }
×
51

×
52
        let interval = self.interval;
×
53
        let sender = self.sender.clone();
×
54
        info!(
×
55
            "Message maintainer, cleaner is {}, archiver is {}, interval: {interval}",
×
56
            map_toggle_str(self.cleaner_enabled),
×
57
            map_toggle_str(self.archiver_enabled)
×
58
        );
59
        let clean_messages = self.cleaner_enabled;
×
60
        let archive_messages = self.archiver_enabled;
×
61
        tokio::spawn(async move {
×
62
            let mut interval_timer = time::interval(interval.get_duration());
×
63
            loop {
64
                interval_timer.tick().await;
×
65
                sender
×
66
                    .send(MaintainMessagesCommand {
×
67
                        clean_messages,
×
68
                        archive_messages,
×
69
                    })
×
70
                    .unwrap_or_else(|err| {
×
71
                        error!("Failed to send MaintainMessagesCommand. Error: {}", err);
×
72
                    });
×
73
            }
74
        });
×
75
    }
×
76
}
77

78
#[async_trait]
79
impl ServerCommand<MaintainMessagesCommand> for MaintainMessagesExecutor {
80
    #[instrument(skip_all)]
×
81
    async fn execute(&mut self, system: &SharedSystem, command: MaintainMessagesCommand) {
×
82
        let system = system.read().await;
×
83
        let streams = system.get_streams();
×
84
        for stream in streams {
×
85
            let topics = stream.get_topics();
×
86
            for topic in topics {
×
87
                let archiver = if command.archive_messages {
×
88
                    system.archiver.clone()
×
89
                } else {
90
                    None
×
91
                };
92
                let expired_segments = handle_expired_segments(
×
93
                    topic,
×
94
                    archiver.clone(),
×
95
                    system.config.segment.archive_expired,
×
96
                    command.clean_messages,
×
97
                )
×
98
                .await;
×
99
                if expired_segments.is_err() {
×
100
                    error!(
×
101
                        "Failed to get expired segments for stream ID: {}, topic ID: {}",
×
102
                        topic.stream_id, topic.topic_id
103
                    );
104
                    continue;
×
105
                }
×
106

107
                let oldest_segments = handle_oldest_segments(
×
108
                    topic,
×
109
                    archiver.clone(),
×
110
                    system.config.topic.delete_oldest_segments,
×
111
                )
×
112
                .await;
×
113
                if oldest_segments.is_err() {
×
114
                    error!(
×
115
                        "Failed to get oldest segments for stream ID: {}, topic ID: {}",
×
116
                        topic.stream_id, topic.topic_id
117
                    );
118
                    continue;
×
119
                }
×
120

×
121
                let deleted_expired_segments = expired_segments.unwrap();
×
122
                let deleted_oldest_segments = oldest_segments.unwrap();
×
123
                let deleted_segments = HandledSegments {
×
124
                    segments_count: deleted_expired_segments.segments_count
×
125
                        + deleted_oldest_segments.segments_count,
×
126
                    messages_count: deleted_expired_segments.messages_count
×
127
                        + deleted_oldest_segments.messages_count,
×
128
                };
×
129

×
130
                if deleted_segments.segments_count == 0 {
×
131
                    info!(
×
132
                        "No segments were deleted for stream ID: {}, topic ID: {}",
×
133
                        topic.stream_id, topic.topic_id
134
                    );
135
                    continue;
×
136
                }
×
137

×
138
                info!(
×
139
                    "Deleted {} segments and {} messages for stream ID: {}, topic ID: {}",
×
140
                    deleted_segments.segments_count,
141
                    deleted_segments.messages_count,
142
                    topic.stream_id,
143
                    topic.topic_id
144
                );
145

146
                system
×
147
                    .metrics
×
148
                    .decrement_segments(deleted_segments.segments_count);
×
149
                system
×
150
                    .metrics
×
151
                    .decrement_messages(deleted_segments.messages_count);
×
152
            }
153
        }
154
    }
×
155

156
    fn start_command_sender(
130✔
157
        &mut self,
130✔
158
        _system: SharedSystem,
130✔
159
        config: &crate::configs::server::ServerConfig,
130✔
160
        sender: Sender<MaintainMessagesCommand>,
130✔
161
    ) {
130✔
162
        if (!config.data_maintenance.archiver.enabled
130✔
163
            || !config.data_maintenance.messages.archiver_enabled)
×
164
            && !config.data_maintenance.messages.cleaner_enabled
130✔
165
        {
166
            return;
130✔
167
        }
×
168

×
169
        let messages_maintainer =
×
170
            MessagesMaintainer::new(&config.data_maintenance.messages, sender);
×
171
        messages_maintainer.start();
×
172
    }
130✔
173

174
    fn start_command_consumer(
130✔
175
        mut self,
130✔
176
        system: SharedSystem,
130✔
177
        config: &crate::configs::server::ServerConfig,
130✔
178
        receiver: flume::Receiver<MaintainMessagesCommand>,
130✔
179
    ) {
130✔
180
        if (!config.data_maintenance.archiver.enabled
130✔
181
            || !config.data_maintenance.messages.archiver_enabled)
×
182
            && !config.data_maintenance.messages.cleaner_enabled
130✔
183
        {
184
            return;
130✔
185
        }
×
186

×
187
        tokio::spawn(async move {
×
188
            let system = system.clone();
×
189
            while let Ok(command) = receiver.recv_async().await {
×
190
                self.execute(&system, command).await;
×
191
            }
192
            info!("Messages maintainer receiver stopped.");
×
193
        });
×
194
    }
130✔
195
}
196

197
async fn handle_expired_segments(
×
198
    topic: &Topic,
×
199
    archiver: Option<Arc<dyn Archiver>>,
×
200
    archive: bool,
×
201
    clean: bool,
×
202
) -> Result<HandledSegments, IggyError> {
×
203
    let expired_segments = get_expired_segments(topic, IggyTimestamp::now()).await;
×
204
    if expired_segments.is_empty() {
×
205
        return Ok(HandledSegments::none());
×
206
    }
×
207

×
208
    if archive {
×
209
        if let Some(archiver) = archiver {
×
210
            info!(
×
211
                "Archiving expired segments for stream ID: {}, topic ID: {}",
×
212
                topic.stream_id, topic.topic_id
213
            );
214
            archive_segments(topic, &expired_segments, archiver.clone()).await?;
×
215
        } else {
216
            error!(
×
217
                "Archiver is not enabled, yet archive_expired is set to true. Cannot archive expired segments for stream ID: {}, topic ID: {}",
×
218
                topic.stream_id, topic.topic_id
219
            );
220
            return Ok(HandledSegments::none());
×
221
        }
222
    }
×
223

224
    if clean {
×
225
        info!(
×
226
            "Deleting expired segments for stream ID: {}, topic ID: {}",
×
227
            topic.stream_id, topic.topic_id
228
        );
229
        delete_segments(topic, &expired_segments).await
×
230
    } else {
231
        info!(
×
232
            "Deleting expired segments is disabled for stream ID: {}, topic ID: {}",
×
233
            topic.stream_id, topic.topic_id
234
        );
235
        Ok(HandledSegments::none())
×
236
    }
237
}
×
238

239
async fn get_expired_segments(topic: &Topic, now: IggyTimestamp) -> Vec<SegmentsToHandle> {
×
240
    let expired_segments = topic
×
241
        .get_expired_segments_start_offsets_per_partition(now)
×
242
        .await;
×
243
    if expired_segments.is_empty() {
×
244
        debug!(
×
245
            "No expired segments found for stream ID: {}, topic ID: {}",
×
246
            topic.stream_id, topic.topic_id
247
        );
248
        return Vec::new();
×
249
    }
×
250

×
251
    debug!(
×
252
        "Found {} expired segments for stream ID: {}, topic ID: {}",
×
253
        expired_segments.len(),
×
254
        topic.stream_id,
255
        topic.topic_id
256
    );
257

258
    expired_segments
×
259
        .into_iter()
×
260
        .map(|(partition_id, start_offsets)| SegmentsToHandle {
×
261
            partition_id,
×
262
            start_offsets,
×
263
        })
×
264
        .collect()
×
265
}
×
266

267
async fn handle_oldest_segments(
×
268
    topic: &Topic,
×
269
    archiver: Option<Arc<dyn Archiver>>,
×
270
    delete_oldest_segments: bool,
×
271
) -> Result<HandledSegments, IggyError> {
×
272
    if let Some(archiver) = archiver {
×
273
        let mut segments_to_archive = Vec::new();
×
274
        for partition in topic.partitions.values() {
×
275
            let mut start_offsets = Vec::new();
×
276
            let partition = partition.read().await;
×
277
            for segment in partition.get_segments() {
×
278
                if !segment.is_closed {
×
279
                    continue;
×
280
                }
×
281

282
                let is_archived = archiver.is_archived(&segment.index_path, None).await;
×
283
                if is_archived.is_err() {
×
284
                    error!(
×
285
                        "Failed to check if segment with start offset: {} is archived for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}",
×
286
                        segment.start_offset, topic.stream_id, topic.topic_id, partition.partition_id, is_archived.err().unwrap()
×
287
                    );
288
                    continue;
×
289
                }
×
290

×
291
                if !is_archived.unwrap() {
×
292
                    debug!(
×
293
                        "Segment with start offset: {} is not archived for stream ID: {}, topic ID: {}, partition ID: {}",
×
294
                        segment.start_offset, topic.stream_id, topic.topic_id, partition.partition_id
×
295
                    );
296
                    start_offsets.push(segment.start_offset);
×
297
                }
×
298
            }
299
            if !start_offsets.is_empty() {
×
300
                info!(
×
301
                    "Found {} segments to archive for stream ID: {}, topic ID: {}, partition ID: {}",
×
302
                    start_offsets.len(),
×
303
                    topic.stream_id,
×
304
                    topic.topic_id,
×
305
                    partition.partition_id
×
306
                );
307
                segments_to_archive.push(SegmentsToHandle {
×
308
                    partition_id: partition.partition_id,
×
309
                    start_offsets,
×
310
                });
×
311
            }
×
312
        }
313

314
        info!(
×
315
            "Archiving {} oldest segments for stream ID: {}, topic ID: {}...",
×
316
            segments_to_archive.len(),
×
317
            topic.stream_id,
318
            topic.topic_id,
319
        );
320
        archive_segments(topic, &segments_to_archive, archiver.clone()).await?;
×
321
    }
×
322

323
    if topic.is_unlimited() {
×
324
        debug!(
×
325
            "Topic is unlimited, oldest segments will not be deleted for stream ID: {}, topic ID: {}",
×
326
            topic.stream_id, topic.topic_id
327
        );
328
        return Ok(HandledSegments::none());
×
329
    }
×
330

×
331
    if !delete_oldest_segments {
×
332
        debug!(
×
333
            "Delete oldest segments is disabled, oldest segments will not be deleted for stream ID: {}, topic ID: {}",
×
334
            topic.stream_id, topic.topic_id
335
        );
336
        return Ok(HandledSegments::none());
×
337
    }
×
338

×
339
    if !topic.is_almost_full() {
×
340
        debug!(
×
341
            "Topic is not almost full, oldest segments will not be deleted for stream ID: {}, topic ID: {}",
×
342
            topic.stream_id, topic.topic_id
343
        );
344
        return Ok(HandledSegments::none());
×
345
    }
×
346

347
    let oldest_segments = get_oldest_segments(topic).await;
×
348
    if oldest_segments.is_empty() {
×
349
        return Ok(HandledSegments::none());
×
350
    }
×
351

×
352
    delete_segments(topic, &oldest_segments).await
×
353
}
×
354

355
async fn get_oldest_segments(topic: &Topic) -> Vec<SegmentsToHandle> {
×
356
    let mut oldest_segments = Vec::new();
×
357
    for partition in topic.partitions.values() {
×
358
        let partition = partition.read().await;
×
359
        if let Some(segment) = partition.get_segments().first() {
×
360
            if !segment.is_closed {
×
361
                continue;
×
362
            }
×
363

×
364
            oldest_segments.push(SegmentsToHandle {
×
365
                partition_id: partition.partition_id,
×
366
                start_offsets: vec![segment.start_offset],
×
367
            });
×
368
        }
×
369
    }
370

371
    if oldest_segments.is_empty() {
×
372
        debug!(
×
373
            "No oldest segments found for stream ID: {}, topic ID: {}",
×
374
            topic.stream_id, topic.topic_id
375
        );
376
        return oldest_segments;
×
377
    }
×
378

×
379
    info!(
×
380
        "Found {} oldest segments for stream ID: {}, topic ID: {}.",
×
381
        oldest_segments.len(),
×
382
        topic.stream_id,
383
        topic.topic_id
384
    );
385

386
    oldest_segments
×
387
}
×
388

389
#[derive()]
390
struct SegmentsToHandle {
391
    partition_id: u32,
392
    start_offsets: Vec<u64>,
393
}
394

395
#[derive(Debug)]
396
struct HandledSegments {
397
    pub segments_count: u32,
398
    pub messages_count: u64,
399
}
400

401
impl HandledSegments {
402
    pub fn none() -> Self {
×
403
        Self {
×
404
            segments_count: 0,
×
405
            messages_count: 0,
×
406
        }
×
407
    }
×
408
}
409

410
async fn archive_segments(
×
411
    topic: &Topic,
×
412
    segments_to_archive: &[SegmentsToHandle],
×
413
    archiver: Arc<dyn Archiver>,
×
414
) -> Result<u64, IggyError> {
×
415
    if segments_to_archive.is_empty() {
×
416
        return Ok(0);
×
417
    }
×
418

×
419
    info!(
×
420
        "Found {} segments to archive for stream ID: {}, topic ID: {}, archiving...",
×
421
        segments_to_archive.len(),
×
422
        topic.stream_id,
423
        topic.topic_id
424
    );
425

426
    let mut archived_segments = 0;
×
427
    for segment_to_archive in segments_to_archive {
×
428
        match topic.get_partition(segment_to_archive.partition_id) {
×
429
            Ok(partition) => {
×
430
                let partition = partition.read().await;
×
431
                for start_offset in &segment_to_archive.start_offsets {
×
432
                    let segment = partition.get_segment(*start_offset);
×
433
                    if segment.is_none() {
×
434
                        error!(
×
435
                            "Segment with start offset: {} not found for stream ID: {}, topic ID: {}, partition ID: {}",
×
436
                            start_offset, topic.stream_id, topic.topic_id, partition.partition_id
×
437
                        );
438
                        continue;
×
439
                    }
×
440

×
441
                    let segment = segment.unwrap();
×
NEW
442
                    let files = [segment.index_path.as_ref(), segment.log_path.as_ref()];
×
443
                    if let Err(error) = archiver.archive(&files, None).await {
×
444
                        error!(
×
445
                            "Failed to archive segment with start offset: {} for stream ID: {}, topic ID: {}, partition ID: {}. Error: {}",
×
446
                            start_offset, topic.stream_id, topic.topic_id, partition.partition_id, error
×
447
                        );
448
                        continue;
×
449
                    }
×
450
                    info!(
×
451
                        "Archived Segment with start offset: {}, for stream ID: {}, topic ID: {}, partition ID: {}",
×
452
                        start_offset, topic.stream_id, topic.topic_id, partition.partition_id
×
453
                    );
454
                    archived_segments += 1;
×
455
                }
456
            }
457
            Err(error) => {
×
458
                error!(
×
459
                    "Partition with ID: {} was not found for stream ID: {}, topic ID: {}. Error: {}",
×
460
                    segment_to_archive.partition_id, topic.stream_id, topic.topic_id, error
461
                );
462
                continue;
×
463
            }
464
        }
465
    }
466

467
    Ok(archived_segments)
×
468
}
×
469

470
async fn delete_segments(
×
471
    topic: &Topic,
×
472
    segments_to_delete: &[SegmentsToHandle],
×
473
) -> Result<HandledSegments, IggyError> {
×
474
    info!(
×
475
        "Deleting {} segments for stream ID: {}, topic ID: {}...",
×
476
        segments_to_delete.len(),
×
477
        topic.stream_id,
478
        topic.topic_id
479
    );
480

481
    let mut segments_count = 0;
×
482
    let mut messages_count = 0;
×
483
    for segment_to_delete in segments_to_delete {
×
484
        match topic.get_partition(segment_to_delete.partition_id) {
×
485
            Ok(partition) => {
×
486
                let mut partition = partition.write().await;
×
487
                let mut last_end_offset = 0;
×
488
                for start_offset in &segment_to_delete.start_offsets {
×
489
                    let deleted_segment = partition.delete_segment(*start_offset).await?;
×
490
                    last_end_offset = deleted_segment.end_offset;
×
491
                    segments_count += 1;
×
492
                    messages_count += deleted_segment.messages_count;
×
493
                }
494

495
                if partition.get_segments().is_empty() {
×
496
                    let start_offset = last_end_offset + 1;
×
497
                    partition.add_persisted_segment(start_offset).await?;
×
498
                }
×
499
            }
500
            Err(error) => {
×
501
                error!(
×
502
                    "Partition with ID: {} not found for stream ID: {}, topic ID: {}. Error: {}",
×
503
                    segment_to_delete.partition_id, topic.stream_id, topic.topic_id, error
504
                );
505
                continue;
×
506
            }
507
        }
508
    }
509

510
    Ok(HandledSegments {
×
511
        segments_count,
×
512
        messages_count,
×
513
    })
×
514
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc