• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5946477056

23 Aug 2023 02:57AM UTC coverage: 76.126% (-0.09%) from 76.212%
5946477056

push

github

web-flow
chore: Split ContractService from CodeService (#1890)

* chore: Remove `CodeService::GetSql`

* chore: Split `ContractService` from `CodeService`

* chore: Include deployment number for cloud contract service

66 of 66 new or added lines in 1 file covered. (100.0%)

46971 of 61702 relevant lines covered (76.13%)

57826.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/server.rs
1
use std::sync::Arc;
2

3
use dozer_api::{tonic_reflection, tonic_web, tower_http};
4
use dozer_types::{
5
    grpc_types::{
6
        contract::{
7
            contract_service_server::{ContractService, ContractServiceServer},
8
            CommonRequest, DotResponse, SchemasResponse, SourcesRequest,
9
        },
10
        live::{
11
            code_service_server::{CodeService, CodeServiceServer},
12
            CommonResponse, ConnectResponse, RunRequest,
13
        },
14
    },
15
    log::{error, info},
16
};
17
use futures::stream::BoxStream;
18
use tokio::sync::broadcast::Receiver;
19

20
use super::state::LiveState;
21
use dozer_types::tracing::Level;
22
use tokio_stream::wrappers::ReceiverStream;
23
use tonic::{Request, Response, Status};
24
use tower_http::trace::{self, TraceLayer};
25
pub const LIVE_PORT: u16 = 4556;
26

27
struct ContractServer {
28
    state: Arc<LiveState>,
29
}
30

×
31
#[tonic::async_trait]
×
32
impl ContractService for ContractServer {
×
33
    async fn sources(
×
34
        &self,
×
35
        request: Request<SourcesRequest>,
×
36
    ) -> Result<Response<SchemasResponse>, Status> {
×
37
        let req = request.into_inner();
×
38
        let res = self.state.get_source_schemas(req.connection_name).await;
×
39
        match res {
×
40
            Ok(res) => Ok(Response::new(res)),
×
41
            Err(e) => Err(Status::internal(e.to_string())),
×
42
        }
×
43
    }
×
44

×
45
    async fn endpoints(
×
46
        &self,
×
47
        _request: Request<CommonRequest>,
×
48
    ) -> Result<Response<SchemasResponse>, Status> {
×
49
        let state = self.state.clone();
×
50
        let res = state.get_endpoints_schemas().await;
×
51

×
52
        match res {
×
53
            Ok(res) => Ok(Response::new(res)),
×
54
            Err(e) => Err(Status::internal(e.to_string())),
×
55
        }
×
56
    }
×
57

×
58
    async fn generate_dot(
×
59
        &self,
×
60
        _request: Request<CommonRequest>,
×
61
    ) -> Result<Response<DotResponse>, Status> {
×
62
        let state = self.state.clone();
×
63
        let res = state.generate_dot().await;
×
64

65
        match res {
×
66
            Ok(res) => Ok(Response::new(res)),
×
67
            Err(e) => Err(Status::internal(e.to_string())),
×
68
        }
69
    }
×
70

×
71
    async fn get_graph_schemas(
×
72
        &self,
×
73
        _request: Request<CommonRequest>,
×
74
    ) -> Result<Response<SchemasResponse>, Status> {
×
75
        let state = self.state.clone();
×
76
        let res = state.get_graph_schemas().await;
×
77

×
78
        match res {
×
79
            Ok(res) => Ok(Response::new(res)),
×
80
            Err(e) => Err(Status::internal(e.to_string())),
×
81
        }
×
82
    }
×
83
}
×
84

85
struct LiveServer {
×
86
    receiver: Receiver<ConnectResponse>,
87
    state: Arc<LiveState>,
×
88
}
×
89

×
90
#[tonic::async_trait]
×
91
impl CodeService for LiveServer {
×
92
    type LiveConnectStream = BoxStream<'static, Result<ConnectResponse, Status>>;
×
93

94
    async fn live_connect(
×
95
        &self,
×
96
        _request: Request<()>,
×
97
    ) -> Result<Response<Self::LiveConnectStream>, Status> {
×
98
        let (tx, rx) = tokio::sync::mpsc::channel(1);
×
99
        let mut receiver = self.receiver.resubscribe();
×
100

×
101
        let initial_state = self.state.clone();
×
102
        tokio::spawn(async move {
×
103
            let initial_state = initial_state.get_current().await;
×
104
            if let Err(e) = tx
×
105
                .send(Ok(ConnectResponse {
×
106
                    live: Some(initial_state),
×
107
                    progress: None,
×
108
                }))
×
109
                .await
×
110
            {
111
                info!("Error getting initial state");
×
112
                info!("{:?}", e);
×
113
            }
×
114
            loop {
×
115
                let res = receiver.recv().await;
×
116
                match res {
×
117
                    Ok(res) => {
×
118
                        let res = tx.send(Ok(res)).await;
×
119
                        match res {
×
120
                            Ok(_) => {}
×
121
                            Err(e) => {
×
122
                                error!("Error sending to channel");
×
123
                                error!("{:?}", e);
×
124
                                break;
×
125
                            }
×
126
                        }
×
127
                    }
×
128
                    Err(_) => {
×
129
                        break;
×
130
                    }
×
131
                }
132
            }
×
133
        });
×
134
        let stream = ReceiverStream::new(rx);
×
135

×
136
        Ok(Response::new(Box::pin(stream) as Self::LiveConnectStream))
×
137
    }
×
138

×
139
    async fn run(&self, request: Request<RunRequest>) -> Result<Response<CommonResponse>, Status> {
×
140
        let req = request.into_inner();
×
141
        let state = self.state.clone();
×
142
        info!("Starting dozer");
×
143
        match state.run(req).await {
×
144
            Ok(_) => {
145
                // let _err = state.broadcast();
×
146
                Ok(Response::new(CommonResponse {}))
×
147
            }
×
148
            Err(e) => Err(Status::internal(e.to_string())),
×
149
        }
×
150
    }
×
151

×
152
    async fn stop(&self, _request: Request<()>) -> Result<Response<CommonResponse>, Status> {
×
153
        let state = self.state.clone();
×
154
        info!("Stopping dozer");
×
155
        match state.stop().await {
×
156
            Ok(_) => {
×
157
                // let _err = state.broadcast();
×
158
                Ok(Response::new(CommonResponse {}))
×
159
            }
160
            Err(e) => Err(Status::internal(e.to_string())),
×
161
        }
162
    }
×
163
}
164

×
165
pub async fn serve(
×
166
    receiver: Receiver<ConnectResponse>,
×
167
    state: Arc<LiveState>,
×
168
) -> Result<(), tonic::transport::Error> {
×
169
    let addr = format!("0.0.0.0:{LIVE_PORT}").parse().unwrap();
×
170
    let contract_server = ContractServer {
×
171
        state: state.clone(),
×
172
    };
×
173
    let live_server = LiveServer { receiver, state };
×
174
    let contract_service = ContractServiceServer::new(contract_server);
×
175
    let code_service = CodeServiceServer::new(live_server);
×
176

×
177
    // Enable CORS for local development
×
178
    let contract_service = tonic_web::config()
×
179
        .allow_all_origins()
×
180
        .enable(contract_service);
×
181
    let code_service = tonic_web::config().allow_all_origins().enable(code_service);
×
182

×
183
    let reflection_service = tonic_reflection::server::Builder::configure()
×
184
        .register_encoded_file_descriptor_set(
×
185
            dozer_types::grpc_types::contract::FILE_DESCRIPTOR_SET,
×
186
        )
×
187
        .register_encoded_file_descriptor_set(dozer_types::grpc_types::live::FILE_DESCRIPTOR_SET)
×
188
        .build()
×
189
        .unwrap();
×
190

×
191
    tonic::transport::Server::builder()
×
192
        .layer(
×
193
            TraceLayer::new_for_http()
×
194
                .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
×
195
                .on_response(trace::DefaultOnResponse::new().level(Level::INFO))
×
196
                .on_failure(trace::DefaultOnFailure::new().level(Level::ERROR)),
×
197
        )
×
198
        .accept_http1(true)
×
199
        .concurrency_limit_per_connection(32)
×
200
        .add_service(contract_service)
×
201
        .add_service(code_service)
×
202
        .add_service(reflection_service)
×
203
        .serve(addr)
×
204
        .await
×
205
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc