• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13280939567

12 Feb 2025 08:18AM UTC coverage: 75.132% (-0.06%) from 75.192%
13280939567

Pull #1514

github

web-flow
Merge d9a2e8c28 into 19db87131
Pull Request #1514: Epilogue part 1

0 of 24 new or added lines in 2 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

25022 of 33304 relevant lines covered (75.13%)

9818.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/sdk/src/consumer_ext/consumer_message_ext.rs
1
use crate::clients::consumer::IggyConsumer;
2
use crate::consumer_ext::{IggyConsumerMessageExt, MessageConsumer};
3
use crate::error::IggyError;
4
use async_trait::async_trait;
5
use futures_util::StreamExt;
6
use tokio::sync::oneshot;
7
use tracing::{error, info};
8

9
#[async_trait]
10
impl IggyConsumerMessageExt for IggyConsumer {
11
    /// Consume messages from the broker and process them with the given event processor
12
    ///
13
    /// # Arguments
14
    ///
15
    /// * `event_processor`: The event processor to use. This must be a reference to a static
16
    /// object that implements the `EventConsumer` trait.
17
    /// * `shutdown_rx`: A receiver which will receive a shutdown signal, which will be used to
18
    /// stop message consumption.
19
    ///
20
    /// # Errors
21
    ///
22
    /// * `IggyError::Disconnected`: The client has been disconnected.
23
    /// * `IggyError::CannotEstablishConnection`: The client cannot establish a connection to iggy.
24
    /// * `IggyError::StaleClient`: This client is stale and cannot be used to consume messages.
25
    /// * `IggyError::InvalidServerAddress`: The server address is invalid.
26
    /// * `IggyError::InvalidClientAddress`: The client address is invalid.
27
    /// * `IggyError::NotConnected`: The client is not connected.
28
    /// * `IggyError::ClientShutdown`: The client has been shut down.
29
    ///
30
    async fn consume_messages(
31
        mut self,
32
        event_processor: &'static (impl MessageConsumer + Sync),
33
        mut shutdown_rx: oneshot::Receiver<()>,
NEW
34
    ) -> Result<(), IggyError> {
×
35
        loop {
NEW
36
            tokio::select! {
×
37
                // Check first if we have received a shutdown signal
NEW
38
                _ = &mut shutdown_rx => {
×
NEW
39
                    info!("Received shutdown signal, stopping message consumption");
×
NEW
40
                    break;
×
41
                }
42

NEW
43
                message = self.next() => {
×
NEW
44
                    match message {
×
NEW
45
                        Some(Ok(received_message)) => {
×
NEW
46
                            if let Err(err) = event_processor.consume(received_message.message).await {
×
NEW
47
                                error!("Error while handling message from delivered from consumer {name} on topic: {topic} and stream {stream} du to error {err}",
×
NEW
48
                                    name= self.name(), topic = self.topic(), stream = self.stream());
×
NEW
49
                            }
×
50
                        }
NEW
51
                        Some(Err(err)) => {
×
NEW
52
                            match err {
×
53
                                IggyError::Disconnected |
54
                                IggyError::CannotEstablishConnection |
55
                                IggyError::StaleClient |
56
                                IggyError::InvalidServerAddress |
57
                                IggyError::InvalidClientAddress |
58
                                IggyError::NotConnected |
59
                                IggyError::ClientShutdown => {
NEW
60
                                    error!("{err:?}: shutdown client: {err}");
×
NEW
61
                                    return Err(err);
×
62
                                }
63
                                _ => {
NEW
64
                                    error!("Error while handling message: {err}");
×
NEW
65
                                    continue;
×
66
                                }
67
                            }
68
                        }
NEW
69
                        None => break,
×
70
                    }
71
                }
72

73
            }
74
        }
75

NEW
76
        Ok(())
×
NEW
77
    }
×
78
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc