• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5957916370

24 Aug 2023 12:27AM UTC coverage: 75.783% (-0.08%) from 75.86%
5957916370

push

github

web-flow
fix: introduce build states and fix inconsistencies (#1902)

* chore: return if state cant be sent

* chore: remove broadcasting state when stopped

* chore: send build message

* chore: fix build

84 of 84 new or added lines in 7 files covered. (100.0%)

46996 of 62014 relevant lines covered (75.78%)

73508.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/server.rs
1
use std::sync::Arc;
2

3
use dozer_api::{tonic_reflection, tonic_web, tower_http};
4
use dozer_types::{
5
    grpc_types::{
6
        contract::{
7
            contract_service_server::{ContractService, ContractServiceServer},
8
            CommonRequest, DotResponse, SchemasResponse, SourcesRequest,
9
        },
10
        live::{
11
            code_service_server::{CodeService, CodeServiceServer},
12
            CommonResponse, ConnectResponse, RunRequest,
13
        },
14
    },
15
    log::{error, info},
16
};
17
use futures::stream::BoxStream;
18
use tokio::sync::broadcast::Receiver;
19

20
use super::state::LiveState;
21
use dozer_types::tracing::Level;
22
use tokio_stream::wrappers::ReceiverStream;
23
use tonic::{Request, Response, Status};
24
use tower_http::trace::{self, TraceLayer};
25
pub const LIVE_PORT: u16 = 4556;
26

27
struct ContractServer {
28
    state: Arc<LiveState>,
29
}
30

×
31
#[tonic::async_trait]
×
32
impl ContractService for ContractServer {
×
33
    async fn sources(
×
34
        &self,
×
35
        request: Request<SourcesRequest>,
×
36
    ) -> Result<Response<SchemasResponse>, Status> {
×
37
        let req = request.into_inner();
×
38
        let res = self.state.get_source_schemas(req.connection_name).await;
×
39
        match res {
×
40
            Ok(res) => Ok(Response::new(res)),
×
41
            Err(e) => Err(Status::internal(e.to_string())),
×
42
        }
×
43
    }
×
44

×
45
    async fn endpoints(
×
46
        &self,
×
47
        _request: Request<CommonRequest>,
×
48
    ) -> Result<Response<SchemasResponse>, Status> {
×
49
        let state = self.state.clone();
×
50
        let res = state.get_endpoints_schemas().await;
×
51

×
52
        match res {
×
53
            Ok(res) => Ok(Response::new(res)),
×
54
            Err(e) => Err(Status::internal(e.to_string())),
×
55
        }
×
56
    }
×
57

×
58
    async fn generate_dot(
×
59
        &self,
×
60
        _request: Request<CommonRequest>,
×
61
    ) -> Result<Response<DotResponse>, Status> {
×
62
        let state = self.state.clone();
×
63
        let res = state.generate_dot().await;
×
64

65
        match res {
×
66
            Ok(res) => Ok(Response::new(res)),
×
67
            Err(e) => Err(Status::internal(e.to_string())),
×
68
        }
69
    }
×
70

×
71
    async fn get_graph_schemas(
×
72
        &self,
×
73
        _request: Request<CommonRequest>,
×
74
    ) -> Result<Response<SchemasResponse>, Status> {
×
75
        let state = self.state.clone();
×
76
        let res = state.get_graph_schemas().await;
×
77

×
78
        match res {
×
79
            Ok(res) => Ok(Response::new(res)),
×
80
            Err(e) => Err(Status::internal(e.to_string())),
×
81
        }
×
82
    }
×
83
}
×
84

85
struct LiveServer {
×
86
    receiver: Receiver<ConnectResponse>,
87
    state: Arc<LiveState>,
×
88
}
×
89

×
90
#[tonic::async_trait]
×
91
impl CodeService for LiveServer {
×
92
    type LiveConnectStream = BoxStream<'static, Result<ConnectResponse, Status>>;
×
93

94
    async fn live_connect(
×
95
        &self,
×
96
        _request: Request<()>,
×
97
    ) -> Result<Response<Self::LiveConnectStream>, Status> {
×
98
        let (tx, rx) = tokio::sync::mpsc::channel(1);
×
99
        let mut receiver = self.receiver.resubscribe();
×
100

×
101
        let initial_state = self.state.clone();
×
102
        tokio::spawn(async move {
×
103
            let initial_state = initial_state.get_current().await;
×
104
            if let Err(e) = tx
×
105
                .send(Ok(ConnectResponse {
×
106
                    live: Some(initial_state),
×
107
                    progress: None,
×
108
                    build: None,
×
109
                }))
×
110
                .await
×
111
            {
×
112
                info!("Error getting initial state");
×
113
                info!("{}", e.to_string());
×
114
                return {};
×
115
            }
×
116
            loop {
×
117
                let res = receiver.recv().await;
×
118
                match res {
×
119
                    Ok(res) => {
×
120
                        let res = tx.send(Ok(res)).await;
×
121
                        match res {
×
122
                            Ok(_) => {}
×
123
                            Err(e) => {
×
124
                                error!("Error sending to channel");
×
125
                                error!("{:?}", e);
×
126
                                break;
×
127
                            }
×
128
                        }
×
129
                    }
×
130
                    Err(_) => {
×
131
                        break;
×
132
                    }
×
133
                }
×
134
            }
×
135
        });
×
136
        let stream = ReceiverStream::new(rx);
×
137

×
138
        Ok(Response::new(Box::pin(stream) as Self::LiveConnectStream))
×
139
    }
×
140

×
141
    async fn run(&self, request: Request<RunRequest>) -> Result<Response<CommonResponse>, Status> {
×
142
        let req = request.into_inner();
×
143
        let state = self.state.clone();
×
144
        info!("Starting dozer");
×
145
        match state.run(req).await {
×
146
            Ok(_) => Ok(Response::new(CommonResponse {})),
×
147
            Err(e) => Err(Status::internal(e.to_string())),
×
148
        }
149
    }
×
150

151
    async fn stop(&self, _request: Request<()>) -> Result<Response<CommonResponse>, Status> {
×
152
        let state = self.state.clone();
×
153
        info!("Stopping dozer");
×
154
        match state.stop().await {
×
155
            Ok(_) => Ok(Response::new(CommonResponse {})),
×
156
            Err(e) => Err(Status::internal(e.to_string())),
×
157
        }
×
158
    }
×
159
}
160

×
161
pub async fn serve(
×
162
    receiver: Receiver<ConnectResponse>,
×
163
    state: Arc<LiveState>,
×
164
) -> Result<(), tonic::transport::Error> {
×
165
    let addr = format!("0.0.0.0:{LIVE_PORT}").parse().unwrap();
×
166
    let contract_server = ContractServer {
×
167
        state: state.clone(),
×
168
    };
×
169
    let live_server = LiveServer { receiver, state };
×
170
    let contract_service = ContractServiceServer::new(contract_server);
×
171
    let code_service = CodeServiceServer::new(live_server);
×
172

×
173
    // Enable CORS for local development
×
174
    let contract_service = tonic_web::config()
×
175
        .allow_all_origins()
×
176
        .enable(contract_service);
×
177
    let code_service = tonic_web::config().allow_all_origins().enable(code_service);
×
178

×
179
    let reflection_service = tonic_reflection::server::Builder::configure()
×
180
        .register_encoded_file_descriptor_set(
×
181
            dozer_types::grpc_types::contract::FILE_DESCRIPTOR_SET,
×
182
        )
×
183
        .register_encoded_file_descriptor_set(dozer_types::grpc_types::live::FILE_DESCRIPTOR_SET)
×
184
        .build()
×
185
        .unwrap();
×
186

×
187
    tonic::transport::Server::builder()
×
188
        .layer(
×
189
            TraceLayer::new_for_http()
×
190
                .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
×
191
                .on_response(trace::DefaultOnResponse::new().level(Level::INFO))
×
192
                .on_failure(trace::DefaultOnFailure::new().level(Level::ERROR)),
×
193
        )
×
194
        .accept_http1(true)
×
195
        .concurrency_limit_per_connection(32)
×
196
        .add_service(contract_service)
×
197
        .add_service(code_service)
×
198
        .add_service(reflection_service)
×
199
        .serve(addr)
×
200
        .await
×
201
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc