• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5829059745

pending completion
5829059745

Pull #1844

github

supergi01
added comments for downloader.rs
Pull Request #1844: feat/live-reload, download and start react server

735 of 735 new or added lines in 11 files covered. (100.0%)

45536 of 61287 relevant lines covered (74.3%)

51206.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/server.rs
1
use std::sync::Arc;
2

3
use dozer_api::{tonic_reflection, tonic_web};
4
use dozer_types::{
5
    grpc_types::{
6
        live::{
7
            code_service_server::{CodeService, CodeServiceServer},
8
            CommonRequest, CommonResponse, DotResponse, LiveResponse, RunSqlRequest,
9
            SchemasResponse, SourcesRequest, SqlRequest, SqlResponse,
10
        },
11
        types::Operation,
12
    },
13
    log::{error, info},
14
};
15
use futures::stream::BoxStream;
16
use tokio::sync::broadcast::Receiver;
17

18
use tokio_stream::wrappers::ReceiverStream;
19
use tonic::{Request, Response, Status};
20

21
use super::state::LiveState;
22
const LIVE_PORT: u16 = 4556;
23
pub struct LiveServer {
24
    pub receiver: Receiver<LiveResponse>,
25
    pub state: Arc<LiveState>,
26
}
27

28
#[tonic::async_trait]
29
impl CodeService for LiveServer {
30
    type LiveConnectStream = BoxStream<'static, Result<LiveResponse, Status>>;
31
    type RunSqlStream = BoxStream<'static, Result<Operation, Status>>;
32

×
33
    async fn live_connect(
×
34
        &self,
×
35
        _request: Request<CommonRequest>,
×
36
    ) -> Result<Response<Self::LiveConnectStream>, Status> {
×
37
        let (tx, rx) = tokio::sync::mpsc::channel(1);
×
38

×
39
        let mut receiver = self.receiver.resubscribe();
×
40
        let initial_state = self.state.clone();
×
41
        tokio::spawn(async move {
×
42
            let initial_state = initial_state.get_current();
×
43
            match initial_state {
×
44
                Ok(initial_state) => {
×
45
                    if let Err(e) = tx.send(Ok(initial_state)).await {
×
46
                        info!("Error getting initial state");
×
47
                        info!("{:?}", e);
×
48
                    }
×
49
                }
×
50
                Err(e) => {
×
51
                    info!("Error sending to channel");
×
52
                    info!("{:?}", e);
×
53
                }
54
            };
55

56
            loop {
×
57
                let res = receiver.recv().await;
×
58
                match res {
×
59
                    Ok(res) => {
×
60
                        let res = tx.send(Ok(res)).await;
×
61
                        match res {
×
62
                            Ok(_) => {}
×
63
                            Err(e) => {
×
64
                                error!("Error sending to channel");
×
65
                                error!("{:?}", e);
×
66
                                break;
×
67
                            }
68
                        }
69
                    }
70
                    Err(_) => {
×
71
                        break;
×
72
                    }
73
                }
74
            }
×
75
        });
×
76
        let stream = ReceiverStream::new(rx);
×
77

×
78
        Ok(Response::new(Box::pin(stream) as Self::LiveConnectStream))
×
79
    }
×
80

×
81
    async fn sources(
×
82
        &self,
×
83
        request: Request<SourcesRequest>,
×
84
    ) -> Result<Response<SchemasResponse>, Status> {
×
85
        let req = request.into_inner();
×
86
        let res = self.state.get_source_schemas(req.connection_name).await;
×
87
        match res {
×
88
            Ok(res) => Ok(Response::new(res)),
×
89
            Err(e) => Err(Status::internal(e.to_string())),
×
90
        }
×
91
    }
×
92

×
93
    async fn endpoints(
×
94
        &self,
×
95
        _request: Request<CommonRequest>,
×
96
    ) -> Result<Response<SchemasResponse>, Status> {
×
97
        let state = self.state.clone();
×
98
        let handle = std::thread::spawn(move || state.get_endpoints_schemas());
×
99
        let res = handle.join().unwrap();
×
100

×
101
        match res {
×
102
            Ok(res) => Ok(Response::new(res)),
×
103
            Err(e) => Err(Status::internal(e.to_string())),
×
104
        }
×
105
    }
×
106

×
107
    async fn generate_dot(
×
108
        &self,
×
109
        _request: Request<CommonRequest>,
×
110
    ) -> Result<Response<DotResponse>, Status> {
×
111
        let state = self.state.clone();
×
112
        let handle = std::thread::spawn(move || state.generate_dot());
×
113
        let res = handle.join().unwrap();
×
114

×
115
        match res {
×
116
            Ok(res) => Ok(Response::new(res)),
×
117
            Err(e) => Err(Status::internal(e.to_string())),
×
118
        }
×
119
    }
×
120

×
121
    async fn get_sql(
×
122
        &self,
×
123
        _request: Request<CommonRequest>,
×
124
    ) -> Result<Response<SqlResponse>, Status> {
×
125
        let res = self.state.get_sql();
×
126

×
127
        match res {
×
128
            Ok(res) => Ok(Response::new(res)),
×
129
            Err(e) => Err(Status::internal(e.to_string())),
×
130
        }
×
131
    }
×
132

×
133
    async fn build_sql(
×
134
        &self,
×
135
        request: Request<SqlRequest>,
×
136
    ) -> Result<Response<SchemasResponse>, Status> {
×
137
        let state = self.state.clone();
×
138
        let handle = std::thread::spawn(move || state.build_sql(request.into_inner().sql));
×
139
        let res = handle.join().unwrap();
×
140

×
141
        match res {
×
142
            Ok(res) => Ok(Response::new(res)),
×
143
            Err(e) => Err(Status::internal(e.to_string())),
×
144
        }
×
145
    }
×
146

×
147
    async fn run_sql(
×
148
        &self,
×
149
        request: Request<RunSqlRequest>,
×
150
    ) -> Result<Response<Self::RunSqlStream>, Status> {
×
151
        let (tx, rx) = tokio::sync::mpsc::channel(1);
×
152

×
153
        let req = request.into_inner();
×
154
        let state = self.state.clone();
×
155

×
156
        state.run_sql(req.sql, req.endpoints, tx).unwrap();
×
157

×
158
        let stream = ReceiverStream::new(rx);
×
159

×
160
        Ok(Response::new(Box::pin(stream) as Self::RunSqlStream))
×
161
    }
×
162

163
    async fn stop_sql(
×
164
        &self,
×
165
        _request: Request<CommonRequest>,
×
166
    ) -> Result<Response<CommonResponse>, Status> {
×
167
        self.state.stop_sql();
×
168
        Ok(Response::new(CommonResponse {}))
×
169
    }
×
170
}
×
171

×
172
pub async fn serve(
×
173
    receiver: Receiver<LiveResponse>,
×
174
    state: Arc<LiveState>,
×
175
) -> Result<(), tonic::transport::Error> {
×
176
    let addr = format!("0.0.0.0:{LIVE_PORT}").parse().unwrap();
×
177
    let live_server = LiveServer { receiver, state };
×
178
    let svc = CodeServiceServer::new(live_server);
×
179

×
180
    // Enable CORS for local development
×
181
    let svc = tonic_web::config().allow_all_origins().enable(svc);
×
182

×
183
    let reflection_service = tonic_reflection::server::Builder::configure()
×
184
        .register_encoded_file_descriptor_set(dozer_types::grpc_types::live::FILE_DESCRIPTOR_SET)
×
185
        .build()
×
186
        .unwrap();
×
187

×
188
    tonic::transport::Server::builder()
×
189
        .accept_http1(true)
×
190
        .add_service(svc)
×
191
        .add_service(reflection_service)
×
192
        .serve(addr)
×
193
        .await
×
194
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc