• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5869050642

pending completion
5869050642

Pull #1858

github

supergi0
updated readme
Pull Request #1858: feat: Implement graph for dozer-live ui

419 of 419 new or added lines in 15 files covered. (100.0%)

46002 of 59761 relevant lines covered (76.98%)

52423.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/server.rs
1
use std::sync::Arc;
2

3
use dozer_api::{tonic_reflection, tonic_web, tower_http};
4
use dozer_types::{
5
    grpc_types::live::{
6
        code_service_server::{CodeService, CodeServiceServer},
7
        CommonRequest, CommonResponse, ConnectResponse, DotResponse, RunRequest, SchemasResponse,
8
        SourcesRequest, SqlResponse,
9
    },
10
    log::{error, info},
11
};
12
use futures::stream::BoxStream;
13
use tokio::sync::broadcast::Receiver;
14

15
use super::state::LiveState;
16
use dozer_types::tracing::Level;
17
use tokio_stream::wrappers::ReceiverStream;
18
use tonic::{Request, Response, Status};
19
use tower_http::trace::{self, TraceLayer};
20
const LIVE_PORT: u16 = 4556;
21
pub struct LiveServer {
22
    pub receiver: Receiver<ConnectResponse>,
23
    pub state: Arc<LiveState>,
24
}
25

26
#[tonic::async_trait]
27
impl CodeService for LiveServer {
28
    type LiveConnectStream = BoxStream<'static, Result<ConnectResponse, Status>>;
29

30
    async fn live_connect(
×
31
        &self,
×
32
        _request: Request<CommonRequest>,
×
33
    ) -> Result<Response<Self::LiveConnectStream>, Status> {
×
34
        let (tx, rx) = tokio::sync::mpsc::channel(1);
×
35
        let mut receiver = self.receiver.resubscribe();
×
36

×
37
        let initial_state = self.state.clone();
×
38
        tokio::spawn(async move {
×
39
            let initial_state = initial_state.get_current();
×
40
            if let Err(e) = tx
×
41
                .send(Ok(ConnectResponse {
×
42
                    live: Some(initial_state),
×
43
                    progress: None,
×
44
                }))
×
45
                .await
×
46
            {
×
47
                info!("Error getting initial state");
×
48
                info!("{:?}", e);
×
49
            }
×
50
            loop {
×
51
                let res = receiver.recv().await;
×
52
                match res {
×
53
                    Ok(res) => {
×
54
                        let res = tx.send(Ok(res)).await;
×
55
                        match res {
×
56
                            Ok(_) => {}
×
57
                            Err(e) => {
×
58
                                error!("Error sending to channel");
×
59
                                error!("{:?}", e);
×
60
                                break;
×
61
                            }
×
62
                        }
×
63
                    }
×
64
                    Err(_) => {
×
65
                        break;
×
66
                    }
×
67
                }
68
            }
69
        });
×
70
        let stream = ReceiverStream::new(rx);
×
71

×
72
        Ok(Response::new(Box::pin(stream) as Self::LiveConnectStream))
×
73
    }
×
74

75
    async fn sources(
×
76
        &self,
×
77
        request: Request<SourcesRequest>,
×
78
    ) -> Result<Response<SchemasResponse>, Status> {
×
79
        let req = request.into_inner();
×
80
        let res = self.state.get_source_schemas(req.connection_name).await;
×
81
        match res {
×
82
            Ok(res) => Ok(Response::new(res)),
×
83
            Err(e) => Err(Status::internal(e.to_string())),
×
84
        }
×
85
    }
×
86

×
87
    async fn endpoints(
×
88
        &self,
×
89
        _request: Request<CommonRequest>,
×
90
    ) -> Result<Response<SchemasResponse>, Status> {
×
91
        let state = self.state.clone();
×
92
        let handle = std::thread::spawn(move || state.get_endpoints_schemas());
×
93
        let res = handle.join().unwrap();
×
94

×
95
        match res {
×
96
            Ok(res) => Ok(Response::new(res)),
×
97
            Err(e) => Err(Status::internal(e.to_string())),
×
98
        }
×
99
    }
×
100

×
101
    async fn generate_dot(
×
102
        &self,
×
103
        _request: Request<CommonRequest>,
×
104
    ) -> Result<Response<DotResponse>, Status> {
×
105
        let state = self.state.clone();
×
106
        let handle = std::thread::spawn(move || state.generate_dot());
×
107
        let res = handle.join().unwrap();
×
108

×
109
        match res {
×
110
            Ok(res) => Ok(Response::new(res)),
×
111
            Err(e) => Err(Status::internal(e.to_string())),
×
112
        }
×
113
    }
×
114

×
115
    async fn get_sql(
×
116
        &self,
×
117
        _request: Request<CommonRequest>,
×
118
    ) -> Result<Response<SqlResponse>, Status> {
×
119
        let res = self.state.get_sql();
×
120

×
121
        match res {
×
122
            Ok(res) => Ok(Response::new(res)),
×
123
            Err(e) => Err(Status::internal(e.to_string())),
×
124
        }
×
125
    }
×
126

×
127
    async fn get_graph_schemas(
×
128
        &self,
×
129
        _request: Request<CommonRequest>,
×
130
    ) -> Result<Response<SchemasResponse>, Status> {
×
131
        let state = self.state.clone();
×
132
        let handle = std::thread::spawn(move || state.get_graph_schemas());
×
133
        let res = handle.join().unwrap();
×
134

×
135
        match res {
×
136
            Ok(res) => Ok(Response::new(res)),
×
137
            Err(e) => Err(Status::internal(e.to_string())),
×
138
        }
×
139
    }
×
140

×
141
    async fn run(&self, request: Request<RunRequest>) -> Result<Response<CommonResponse>, Status> {
×
142
        let req = request.into_inner();
×
143
        let state = self.state.clone();
×
144
        info!("Starting dozer");
×
145
        match state.run(req) {
×
146
            Ok(_) => {
147
                // let _err = state.broadcast();
×
148
                Ok(Response::new(CommonResponse {}))
×
149
            }
×
150
            Err(e) => Err(Status::internal(e.to_string())),
×
151
        }
×
152
    }
×
153

×
154
    async fn stop(
×
155
        &self,
×
156
        _request: Request<CommonRequest>,
×
157
    ) -> Result<Response<CommonResponse>, Status> {
×
158
        let state = self.state.clone();
×
159
        info!("Stopping dozer");
×
160
        match state.stop() {
×
161
            Ok(_) => {
×
162
                // let _err = state.broadcast();
163
                Ok(Response::new(CommonResponse {}))
×
164
            }
×
165
            Err(e) => Err(Status::internal(e.to_string())),
×
166
        }
×
167
    }
×
168
}
×
169

×
170
pub async fn serve(
×
171
    receiver: Receiver<ConnectResponse>,
×
172
    state: Arc<LiveState>,
×
173
) -> Result<(), tonic::transport::Error> {
×
174
    let addr = format!("0.0.0.0:{LIVE_PORT}").parse().unwrap();
×
175
    let live_server = LiveServer { receiver, state };
×
176
    let svc = CodeServiceServer::new(live_server);
×
177

×
178
    // Enable CORS for local development
×
179
    let svc = tonic_web::config().allow_all_origins().enable(svc);
×
180

×
181
    let reflection_service = tonic_reflection::server::Builder::configure()
×
182
        .register_encoded_file_descriptor_set(dozer_types::grpc_types::live::FILE_DESCRIPTOR_SET)
×
183
        .build()
×
184
        .unwrap();
×
185

×
186
    tonic::transport::Server::builder()
×
187
        .layer(
×
188
            TraceLayer::new_for_http()
×
189
                .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
×
190
                .on_response(trace::DefaultOnResponse::new().level(Level::INFO))
×
191
                .on_failure(trace::DefaultOnFailure::new().level(Level::ERROR)),
×
192
        )
×
193
        .accept_http1(true)
×
194
        .concurrency_limit_per_connection(32)
×
195
        .add_service(svc)
×
196
        .add_service(reflection_service)
×
197
        .serve(addr)
×
198
        .await
×
199
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc