• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/server.rs
1
use std::sync::Arc;
2

3
use dozer_api::{tonic_reflection, tonic_web, tower_http};
4
use dozer_types::{
5
    grpc_types::live::{
6
        code_service_server::{CodeService, CodeServiceServer},
7
        CommonRequest, CommonResponse, ConnectResponse, DotResponse, RunRequest, SchemasResponse,
8
        SourcesRequest, SqlResponse,
9
    },
10
    log::{error, info},
11
};
12
use futures::stream::BoxStream;
13
use tokio::sync::broadcast::Receiver;
14

15
use super::state::LiveState;
16
use dozer_types::tracing::Level;
17
use tokio_stream::wrappers::ReceiverStream;
18
use tonic::{Request, Response, Status};
19
use tower_http::trace::{self, TraceLayer};
20
const LIVE_PORT: u16 = 4556;
21
pub struct LiveServer {
22
    pub receiver: Receiver<ConnectResponse>,
23
    pub state: Arc<LiveState>,
24
}
25

26
#[tonic::async_trait]
27
impl CodeService for LiveServer {
28
    type LiveConnectStream = BoxStream<'static, Result<ConnectResponse, Status>>;
29

30
    async fn live_connect(
×
31
        &self,
×
32
        _request: Request<CommonRequest>,
×
33
    ) -> Result<Response<Self::LiveConnectStream>, Status> {
×
34
        let (tx, rx) = tokio::sync::mpsc::channel(1);
×
35
        let mut receiver = self.receiver.resubscribe();
×
36

×
37
        let initial_state = self.state.clone();
×
38
        tokio::spawn(async move {
×
39
            let initial_state = initial_state.get_current().await;
×
40
            if let Err(e) = tx
×
41
                .send(Ok(ConnectResponse {
×
42
                    live: Some(initial_state),
×
43
                    progress: None,
×
44
                }))
×
45
                .await
×
46
            {
×
47
                info!("Error getting initial state");
×
48
                info!("{:?}", e);
×
49
            }
×
50
            loop {
×
51
                let res = receiver.recv().await;
×
52
                match res {
×
53
                    Ok(res) => {
×
54
                        let res = tx.send(Ok(res)).await;
×
55
                        match res {
×
56
                            Ok(_) => {}
×
57
                            Err(e) => {
×
58
                                error!("Error sending to channel");
×
59
                                error!("{:?}", e);
×
60
                                break;
×
61
                            }
×
62
                        }
×
63
                    }
×
64
                    Err(_) => {
×
65
                        break;
×
66
                    }
×
67
                }
68
            }
69
        });
×
70
        let stream = ReceiverStream::new(rx);
×
71

×
72
        Ok(Response::new(Box::pin(stream) as Self::LiveConnectStream))
×
73
    }
×
74

75
    async fn sources(
×
76
        &self,
×
77
        request: Request<SourcesRequest>,
×
78
    ) -> Result<Response<SchemasResponse>, Status> {
×
79
        let req = request.into_inner();
×
80
        let res = self.state.get_source_schemas(req.connection_name).await;
×
81
        match res {
×
82
            Ok(res) => Ok(Response::new(res)),
×
83
            Err(e) => Err(Status::internal(e.to_string())),
×
84
        }
×
85
    }
×
86

×
87
    async fn endpoints(
×
88
        &self,
×
89
        _request: Request<CommonRequest>,
×
90
    ) -> Result<Response<SchemasResponse>, Status> {
×
91
        let state = self.state.clone();
×
92
        let res = state.get_endpoints_schemas().await;
×
93

×
94
        match res {
×
95
            Ok(res) => Ok(Response::new(res)),
×
96
            Err(e) => Err(Status::internal(e.to_string())),
×
97
        }
×
98
    }
×
99

×
100
    async fn generate_dot(
×
101
        &self,
×
102
        _request: Request<CommonRequest>,
×
103
    ) -> Result<Response<DotResponse>, Status> {
×
104
        let state = self.state.clone();
×
105
        let res = state.generate_dot().await;
×
106

107
        match res {
×
108
            Ok(res) => Ok(Response::new(res)),
×
109
            Err(e) => Err(Status::internal(e.to_string())),
×
110
        }
×
111
    }
×
112

×
113
    async fn get_sql(
×
114
        &self,
×
115
        _request: Request<CommonRequest>,
×
116
    ) -> Result<Response<SqlResponse>, Status> {
×
117
        let res = self.state.get_sql().await;
×
118

119
        match res {
×
120
            Ok(res) => Ok(Response::new(res)),
×
121
            Err(e) => Err(Status::internal(e.to_string())),
×
122
        }
×
123
    }
×
124

×
125
    async fn get_graph_schemas(
×
126
        &self,
×
127
        _request: Request<CommonRequest>,
×
128
    ) -> Result<Response<SchemasResponse>, Status> {
×
129
        let state = self.state.clone();
×
130
        let res = state.get_graph_schemas().await;
×
131

×
132
        match res {
×
133
            Ok(res) => Ok(Response::new(res)),
×
134
            Err(e) => Err(Status::internal(e.to_string())),
×
135
        }
×
136
    }
×
137

×
138
    async fn run(&self, request: Request<RunRequest>) -> Result<Response<CommonResponse>, Status> {
×
139
        let req = request.into_inner();
×
140
        let state = self.state.clone();
×
141
        info!("Starting dozer");
×
142
        match state.run(req).await {
×
143
            Ok(_) => {
×
144
                // let _err = state.broadcast();
145
                Ok(Response::new(CommonResponse {}))
×
146
            }
147
            Err(e) => Err(Status::internal(e.to_string())),
×
148
        }
×
149
    }
×
150

×
151
    async fn stop(
×
152
        &self,
×
153
        _request: Request<CommonRequest>,
×
154
    ) -> Result<Response<CommonResponse>, Status> {
×
155
        let state = self.state.clone();
×
156
        info!("Stopping dozer");
×
157
        match state.stop().await {
×
158
            Ok(_) => {
×
159
                // let _err = state.broadcast();
×
160
                Ok(Response::new(CommonResponse {}))
×
161
            }
×
162
            Err(e) => Err(Status::internal(e.to_string())),
×
163
        }
×
164
    }
×
165
}
×
166

×
167
pub async fn serve(
×
168
    receiver: Receiver<ConnectResponse>,
×
169
    state: Arc<LiveState>,
×
170
) -> Result<(), tonic::transport::Error> {
×
171
    let addr = format!("0.0.0.0:{LIVE_PORT}").parse().unwrap();
×
172
    let live_server = LiveServer { receiver, state };
×
173
    let svc = CodeServiceServer::new(live_server);
×
174

×
175
    // Enable CORS for local development
×
176
    let svc = tonic_web::config().allow_all_origins().enable(svc);
×
177

×
178
    let reflection_service = tonic_reflection::server::Builder::configure()
×
179
        .register_encoded_file_descriptor_set(dozer_types::grpc_types::live::FILE_DESCRIPTOR_SET)
×
180
        .build()
×
181
        .unwrap();
×
182

×
183
    tonic::transport::Server::builder()
×
184
        .layer(
×
185
            TraceLayer::new_for_http()
×
186
                .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
×
187
                .on_response(trace::DefaultOnResponse::new().level(Level::INFO))
×
188
                .on_failure(trace::DefaultOnFailure::new().level(Level::ERROR)),
×
189
        )
×
190
        .accept_http1(true)
×
191
        .concurrency_limit_per_connection(32)
×
192
        .add_service(svc)
×
193
        .add_service(reflection_service)
×
194
        .serve(addr)
×
195
        .await
×
196
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc