• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-admin/src/server.rs
1
use crate::{
2
    cli::{utils::get_db_path, AdminCliConfig},
3
    db::pool::establish_connection,
4
    services::{
5
        api_config_service::ApiConfigService, application_service::AppService,
6
        connection_service::ConnectionService, endpoint_service::EndpointService,
7
        source_service::SourceService,
8
    },
9
};
10
use dotenvy::dotenv;
11
use tonic::{transport::Server, Request, Response, Status};
12
pub mod dozer_admin_grpc {
13
    #![allow(clippy::derive_partial_eq_without_eq, clippy::large_enum_variant)]
14
    tonic::include_proto!("dozer_admin_grpc");
15
    pub(crate) const FILE_DESCRIPTOR_SET: &[u8] =
16
        tonic::include_file_descriptor_set!("dozer_admin_grpc_descriptor");
17
}
18
use dozer_admin_grpc::{
19
    dozer_admin_server::{DozerAdmin, DozerAdminServer},
20
    CreateApiConfigRequest, CreateApiConfigResponse, CreateAppRequest, CreateAppResponse,
21
    CreateConnectionRequest, CreateConnectionResponse, CreateEndpointRequest,
22
    CreateEndpointResponse, CreateSourceRequest, CreateSourceResponse, DeleteEndpointRequest,
23
    DeleteEndpointResponse, GetAllConnectionRequest, GetAllConnectionResponse, GetApiConfigRequest,
24
    GetApiConfigResponse, GetAppRequest, GetAppResponse, GetConnectionDetailsRequest,
25
    GetConnectionDetailsResponse, GetEndpointRequest, GetEndpointResponse, GetSchemaRequest,
26
    GetSchemaResponse, GetSourceRequest, GetSourceResponse, StartPipelineRequest,
27
    StartPipelineResponse, UpdateApiConfigRequest, UpdateApiConfigResponse,
28
    UpdateConnectionRequest, UpdateConnectionResponse, UpdateEndpointRequest,
29
    UpdateEndpointResponse, UpdateSourceRequest, UpdateSourceResponse,
30
};
31

32
use self::dozer_admin_grpc::{
33
    GetAllEndpointRequest, GetAllEndpointResponse, GetAllSourceRequest, GetAllSourceResponse,
34
    ListAppRequest, ListAppResponse, UpdateAppRequest, UpdateAppResponse,
35
    ValidateConnectionRequest, ValidateConnectionResponse,
36
};
37

38
pub struct GrpcService {
39
    app_service: AppService,
40
    connection_service: ConnectionService,
41
    source_service: SourceService,
42
    endpoint_service: EndpointService,
43
    api_config_service: ApiConfigService,
44
}
45

46
#[tonic::async_trait]
47
impl DozerAdmin for GrpcService {
48
    async fn create_application(
×
49
        &self,
×
50
        request: tonic::Request<CreateAppRequest>,
×
51
    ) -> Result<tonic::Response<CreateAppResponse>, tonic::Status> {
×
52
        let result = self.app_service.create(request.into_inner());
×
53
        match result {
×
54
            Ok(response) => Ok(Response::new(response)),
×
55
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
56
        }
57
    }
×
58
    async fn list_applications(
×
59
        &self,
×
60
        request: tonic::Request<ListAppRequest>,
×
61
    ) -> Result<tonic::Response<ListAppResponse>, tonic::Status> {
×
62
        let result = self.app_service.list(request.into_inner());
×
63
        match result {
×
64
            Ok(response) => Ok(Response::new(response)),
×
65
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
66
        }
67
    }
×
68
    async fn update_application(
×
69
        &self,
×
70
        request: tonic::Request<UpdateAppRequest>,
×
71
    ) -> Result<tonic::Response<UpdateAppResponse>, tonic::Status> {
×
72
        let result = self.app_service.update_app(request.into_inner());
×
73
        match result {
×
74
            Ok(response) => Ok(Response::new(response)),
×
75
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
76
        }
77
    }
×
78

79
    async fn get_application(
×
80
        &self,
×
81
        request: tonic::Request<GetAppRequest>,
×
82
    ) -> Result<tonic::Response<GetAppResponse>, tonic::Status> {
×
83
        let result = self.app_service.get_app(request.into_inner());
×
84
        match result {
×
85
            Ok(response) => Ok(Response::new(response)),
×
86
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
87
        }
88
    }
×
89

90
    async fn validate_connection(
×
91
        &self,
×
92
        request: tonic::Request<ValidateConnectionRequest>,
×
93
    ) -> Result<tonic::Response<ValidateConnectionResponse>, tonic::Status> {
×
94
        let result = self
×
95
            .connection_service
×
96
            .validate_connection(request.into_inner());
×
97
        match result.await {
×
98
            Ok(response) => Ok(Response::new(response)),
×
99
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
100
        }
101
    }
×
102

103
    async fn create_connection(
×
104
        &self,
×
105
        request: Request<CreateConnectionRequest>,
×
106
    ) -> Result<Response<CreateConnectionResponse>, Status> {
×
107
        let result = self
×
108
            .connection_service
×
109
            .create_connection(request.into_inner());
×
110
        match result {
×
111
            Ok(response) => Ok(Response::new(response)),
×
112
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
113
        }
114
    }
×
115
    async fn get_connection_details(
×
116
        &self,
×
117
        request: Request<GetConnectionDetailsRequest>,
×
118
    ) -> Result<Response<GetConnectionDetailsResponse>, Status> {
×
119
        let result = self
×
120
            .connection_service
×
121
            .get_connection_details(request.into_inner())
×
122
            .await;
×
123
        match result {
×
124
            Ok(response) => Ok(Response::new(response)),
×
125
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
126
        }
127
    }
×
128

129
    async fn list_connections(
×
130
        &self,
×
131
        request: Request<GetAllConnectionRequest>,
×
132
    ) -> Result<Response<GetAllConnectionResponse>, Status> {
×
133
        let result = self.connection_service.list(request.into_inner());
×
134
        match result {
×
135
            Ok(response) => Ok(Response::new(response)),
×
136
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
137
        }
138
    }
×
139

140
    async fn get_schema(
×
141
        &self,
×
142
        request: Request<GetSchemaRequest>,
×
143
    ) -> Result<Response<GetSchemaResponse>, Status> {
×
144
        let result = self
×
145
            .connection_service
×
146
            .get_schema(request.into_inner())
×
147
            .await;
×
148
        match result {
×
149
            Ok(response) => Ok(Response::new(response)),
×
150
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
151
        }
152
    }
×
153

154
    async fn update_connection(
×
155
        &self,
×
156
        request: Request<UpdateConnectionRequest>,
×
157
    ) -> Result<Response<UpdateConnectionResponse>, Status> {
×
158
        let result = self.connection_service.update(request.into_inner());
×
159
        match result {
×
160
            Ok(response) => Ok(Response::new(response)),
×
161
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
162
        }
163
    }
×
164

165
    async fn create_source(
×
166
        &self,
×
167
        request: Request<CreateSourceRequest>,
×
168
    ) -> Result<Response<CreateSourceResponse>, Status> {
×
169
        let result = self.source_service.create_source(request.into_inner());
×
170
        match result {
×
171
            Ok(response) => Ok(Response::new(response)),
×
172
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
173
        }
174
    }
×
175
    async fn get_source(
×
176
        &self,
×
177
        request: Request<GetSourceRequest>,
×
178
    ) -> Result<Response<GetSourceResponse>, Status> {
×
179
        let result = self.source_service.get_source(request.into_inner());
×
180
        match result {
×
181
            Ok(response) => Ok(Response::new(response)),
×
182
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
183
        }
184
    }
×
185
    async fn list_sources(
×
186
        &self,
×
187
        request: tonic::Request<GetAllSourceRequest>,
×
188
    ) -> Result<tonic::Response<GetAllSourceResponse>, tonic::Status> {
×
189
        let result = self.source_service.list(request.into_inner());
×
190
        match result {
×
191
            Ok(response) => Ok(Response::new(response)),
×
192
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
193
        }
194
    }
×
195

196
    async fn update_source(
×
197
        &self,
×
198
        request: Request<UpdateSourceRequest>,
×
199
    ) -> Result<Response<UpdateSourceResponse>, Status> {
×
200
        let result = self.source_service.update_source(request.into_inner());
×
201
        match result {
×
202
            Ok(response) => Ok(Response::new(response)),
×
203
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
204
        }
205
    }
×
206

207
    async fn create_endpoint(
×
208
        &self,
×
209
        request: tonic::Request<CreateEndpointRequest>,
×
210
    ) -> Result<tonic::Response<CreateEndpointResponse>, tonic::Status> {
×
211
        let result = self.endpoint_service.create_endpoint(request.into_inner());
×
212
        match result {
×
213
            Ok(response) => Ok(Response::new(response)),
×
214
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
215
        }
216
    }
×
217
    async fn get_endpoint(
×
218
        &self,
×
219
        request: tonic::Request<GetEndpointRequest>,
×
220
    ) -> Result<tonic::Response<GetEndpointResponse>, tonic::Status> {
×
221
        let result = self.endpoint_service.get_endpoint(request.into_inner());
×
222
        match result {
×
223
            Ok(response) => Ok(Response::new(response)),
×
224
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
225
        }
226
    }
×
227
    async fn list_endpoints(
×
228
        &self,
×
229
        request: tonic::Request<GetAllEndpointRequest>,
×
230
    ) -> Result<tonic::Response<GetAllEndpointResponse>, tonic::Status> {
×
231
        let result = self.endpoint_service.list(request.into_inner());
×
232
        match result {
×
233
            Ok(response) => Ok(Response::new(response)),
×
234
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
235
        }
236
    }
×
237
    async fn update_endpoint(
×
238
        &self,
×
239
        request: tonic::Request<UpdateEndpointRequest>,
×
240
    ) -> Result<tonic::Response<UpdateEndpointResponse>, tonic::Status> {
×
241
        let result = self.endpoint_service.update_endpoint(request.into_inner());
×
242
        match result {
×
243
            Ok(response) => Ok(Response::new(response)),
×
244
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
245
        }
246
    }
×
247
    async fn delete_endpoint(
×
248
        &self,
×
249
        request: tonic::Request<DeleteEndpointRequest>,
×
250
    ) -> Result<tonic::Response<DeleteEndpointResponse>, tonic::Status> {
×
251
        let result = self.endpoint_service.delete(request.into_inner());
×
252
        match result {
×
253
            Ok(response) => Ok(Response::new(response)),
×
254
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
255
        }
256
    }
×
257

258
    async fn start_pipeline(
×
259
        &self,
×
260
        request: tonic::Request<StartPipelineRequest>,
×
261
    ) -> Result<tonic::Response<StartPipelineResponse>, tonic::Status> {
×
262
        let result = self.app_service.start_pipeline(request.into_inner());
×
263
        match result {
×
264
            Ok(response) => Ok(Response::new(response)),
×
265
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
266
        }
267
    }
×
268

269
    async fn create_api_config(
×
270
        &self,
×
271
        request: tonic::Request<CreateApiConfigRequest>,
×
272
    ) -> Result<tonic::Response<CreateApiConfigResponse>, tonic::Status> {
×
273
        let result = self
×
274
            .api_config_service
×
275
            .create_api_config(request.into_inner());
×
276
        match result {
×
277
            Ok(response) => Ok(Response::new(response)),
×
278
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
279
        }
280
    }
×
281

282
    async fn update_api_config(
×
283
        &self,
×
284
        request: tonic::Request<UpdateApiConfigRequest>,
×
285
    ) -> Result<tonic::Response<UpdateApiConfigResponse>, tonic::Status> {
×
286
        let result = self.api_config_service.update(request.into_inner());
×
287
        match result {
×
288
            Ok(response) => Ok(Response::new(response)),
×
289
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
290
        }
291
    }
×
292
    async fn get_api_config(
×
293
        &self,
×
294
        request: tonic::Request<GetApiConfigRequest>,
×
295
    ) -> Result<tonic::Response<GetApiConfigResponse>, tonic::Status> {
×
296
        let result = self.api_config_service.get_api_config(request.into_inner());
×
297
        match result {
×
298
            Ok(response) => Ok(Response::new(response)),
×
299
            Err(e) => Err(Status::new(tonic::Code::Internal, e.message)),
×
300
        }
301
    }
×
302
}
303

304
pub async fn start_admin_server(config: AdminCliConfig) -> Result<(), tonic::transport::Error> {
×
305
    let host = config.host;
×
306
    let port = config.port;
×
307
    let dozer_path = config.dozer_path;
×
308
    let addr = format!("{host:}:{port:}").parse().unwrap();
×
309
    dotenv().ok();
×
310
    let database_url: String = get_db_path();
×
311
    let db_pool = establish_connection(database_url);
×
312
    let grpc_service = GrpcService {
×
313
        connection_service: ConnectionService::new(db_pool.to_owned()),
×
314
        source_service: SourceService::new(db_pool.to_owned()),
×
315
        endpoint_service: EndpointService::new(db_pool.to_owned()),
×
316
        app_service: AppService::new(db_pool.to_owned(), dozer_path),
×
317
        api_config_service: ApiConfigService::new(db_pool.to_owned()),
×
318
    };
×
319
    let server = DozerAdminServer::new(grpc_service);
×
320
    let server = tonic_web::config().allow_all_origins().enable(server);
×
321
    let reflection_service = tonic_reflection::server::Builder::configure()
×
322
        .register_encoded_file_descriptor_set(dozer_admin_grpc::FILE_DESCRIPTOR_SET)
×
323
        .build()
×
324
        .unwrap();
×
325
    Server::builder()
×
326
        .accept_http1(true)
×
327
        .add_service(reflection_service)
×
328
        .add_service(server)
×
329
        .serve(addr)
×
330
        .await
×
331
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc