• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4392484403

pending completion
4392484403

push

github

GitHub
feat: Asynchoronous indexing (#1206)

270 of 270 new or added lines in 13 files covered. (100.0%)

28714 of 38777 relevant lines covered (74.05%)

89484.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.74
/dozer-api/src/grpc/internal/internal_pipeline_server.rs
1
use crossbeam::channel::{Receiver, Sender};
2
use dozer_types::grpc_types::{
3
    internal::{
4
        internal_pipeline_service_server::{self, InternalPipelineService},
5
        AliasEventsRequest, AliasRedirected, OperationsRequest,
6
    },
7
    types::Operation,
8
};
9
use dozer_types::{crossbeam, log::info, models::app_config::Config, tracing::warn};
10
use std::{fmt::Debug, net::ToSocketAddrs, pin::Pin, thread};
11
use tokio::{runtime::Runtime, sync::broadcast};
12
use tokio_stream::wrappers::ReceiverStream;
13
use tonic::{codegen::futures_core::Stream, transport::Server, Response, Status};
14

15
pub type PipelineEventSenders = (Sender<AliasRedirected>, Sender<Operation>);
16
pub type PipelineEventReceivers = (Receiver<AliasRedirected>, Receiver<Operation>);
17

18
pub struct InternalPipelineServer {
19
    alias_redirected_receiver: broadcast::Receiver<AliasRedirected>,
20
    operation_receiver: broadcast::Receiver<Operation>,
21
}
22
impl InternalPipelineServer {
23
    pub fn new(pipeline_event_receivers: PipelineEventReceivers) -> Self {
10✔
24
        let alias_redirected_receiver =
10✔
25
            crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.0);
10✔
26
        let operation_receiver =
10✔
27
            crossbeam_mpsc_receiver_to_tokio_broadcast_receiver(pipeline_event_receivers.1);
10✔
28
        Self {
10✔
29
            alias_redirected_receiver,
10✔
30
            operation_receiver,
10✔
31
        }
10✔
32
    }
10✔
33
}
34

35
fn crossbeam_mpsc_receiver_to_tokio_broadcast_receiver<T: Clone + Debug + Send + 'static>(
20✔
36
    crossbeam_receiver: Receiver<T>,
20✔
37
) -> broadcast::Receiver<T> {
20✔
38
    let (broadcast_sender, broadcast_receiver) = broadcast::channel(16);
20✔
39
    thread::Builder::new().name("crossbeam_mpsc_receiver_to_tokio_broadcast_receiver".to_string()).spawn(move || loop {
20✔
40
        let message = crossbeam_receiver.recv();
45✔
41
        match message {
45✔
42
            Ok(message) => {
25✔
43
                let result = broadcast_sender.send(message);
25✔
44
                if let Err(e) = result {
25✔
45
                    warn!("Internal Pipeline server - Error sending message to broadcast channel: {:?}", e);
×
46
                }
25✔
47
            }
48
            Err(err) => {
20✔
49
                warn!(
20✔
50
                    "Internal Pipeline server - message reveived error: {:?}",
20✔
51
                    err
20✔
52
                );
20✔
53
                break;
20✔
54
            }
20✔
55
        }
20✔
56
    }).expect("Failed to spawn crossbeam_mpsc_receiver_to_tokio_broadcast_receiver thread");
20✔
57
    broadcast_receiver
20✔
58
}
20✔
59

60
type OperationsStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
61
type AliasEventsStream = Pin<Box<dyn Stream<Item = Result<AliasRedirected, Status>> + Send>>;
62

63
#[tonic::async_trait]
64
impl InternalPipelineService for InternalPipelineServer {
65
    type StreamOperationsStream = OperationsStream;
66
    async fn stream_operations(
10✔
67
        &self,
10✔
68
        _request: tonic::Request<OperationsRequest>,
10✔
69
    ) -> Result<Response<OperationsStream>, Status> {
10✔
70
        let (operation_sender, operation_receiver) = tokio::sync::mpsc::channel(1000);
10✔
71
        let mut receiver = self.operation_receiver.resubscribe();
10✔
72
        tokio::spawn(async move {
10✔
73
            loop {
74
                let result = receiver.try_recv();
3,415✔
75
                match result {
3,415✔
76
                    Ok(operation) => {
25✔
77
                        let result = operation_sender.send(Ok(operation)).await;
25✔
78
                        if let Err(e) = result {
25✔
79
                            warn!("Error sending message to mpsc channel: {:?}", e);
×
80
                            break;
×
81
                        }
25✔
82
                    }
83
                    Err(err) => {
3,390✔
84
                        if err == broadcast::error::TryRecvError::Closed {
3,390✔
85
                            break;
10✔
86
                        }
3,380✔
87
                    }
88
                }
89
                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
3,405✔
90
            }
91
        });
10✔
92
        let output_stream = ReceiverStream::new(operation_receiver);
10✔
93
        Ok(Response::new(Box::pin(output_stream)))
10✔
94
    }
10✔
95

96
    type StreamAliasEventsStream = AliasEventsStream;
97

98
    async fn stream_alias_events(
10✔
99
        &self,
10✔
100
        _request: tonic::Request<AliasEventsRequest>,
10✔
101
    ) -> Result<Response<Self::StreamAliasEventsStream>, Status> {
10✔
102
        let (alias_redirected_sender, alias_redirected_receiver) = tokio::sync::mpsc::channel(1000);
10✔
103
        let mut receiver = self.alias_redirected_receiver.resubscribe();
10✔
104
        tokio::spawn(async move {
10✔
105
            loop {
106
                let result = receiver.try_recv();
3,460✔
107
                match result {
3,460✔
108
                    Ok(alias_redirected) => {
×
109
                        let result = alias_redirected_sender.send(Ok(alias_redirected)).await;
×
110
                        if let Err(e) = result {
×
111
                            warn!("Error sending message to mpsc channel: {:?}", e);
×
112
                            break;
×
113
                        }
×
114
                    }
115
                    Err(err) => {
3,460✔
116
                        if err == broadcast::error::TryRecvError::Closed {
3,460✔
117
                            break;
10✔
118
                        }
3,450✔
119
                    }
120
                }
121
                tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
3,450✔
122
            }
123
        });
10✔
124
        let output_stream = ReceiverStream::new(alias_redirected_receiver);
10✔
125
        Ok(Response::new(Box::pin(output_stream)))
10✔
126
    }
10✔
127
}
128

129
pub fn start_internal_pipeline_server(
10✔
130
    app_config: Config,
10✔
131
    receivers: PipelineEventReceivers,
10✔
132
) -> Result<(), tonic::transport::Error> {
10✔
133
    let rt = Runtime::new().unwrap();
10✔
134
    rt.block_on(async { _start_internal_pipeline_server(app_config, receivers).await })
10✔
135
}
10✔
136
async fn _start_internal_pipeline_server(
10✔
137
    app_config: Config,
10✔
138
    receivers: PipelineEventReceivers,
10✔
139
) -> Result<(), tonic::transport::Error> {
10✔
140
    let server = InternalPipelineServer::new(receivers);
10✔
141

10✔
142
    let internal_config = app_config
10✔
143
        .api
10✔
144
        .unwrap_or_default()
10✔
145
        .app_grpc
10✔
146
        .unwrap_or_default();
10✔
147

148
    info!(
149
        "Starting Internal Server on http://{}:{}",
10✔
150
        internal_config.host, internal_config.port,
151
    );
152
    let mut addr = format!("{}:{}", internal_config.host, internal_config.port)
10✔
153
        .to_socket_addrs()
10✔
154
        .unwrap();
10✔
155
    Server::builder()
10✔
156
        .add_service(internal_pipeline_service_server::InternalPipelineServiceServer::new(server))
10✔
157
        .serve(addr.next().unwrap())
10✔
158
        .await
10✔
159
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc