• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5709656380

pending completion
5709656380

push

github

web-flow
Version bump (#1808)

45512 of 59772 relevant lines covered (76.14%)

39312.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.66
/dozer-api/src/grpc/client_server.rs
1
use super::metric_middleware::MetricMiddlewareLayer;
2
use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::TypedService};
3
use crate::errors::ApiInitError;
4
use crate::grpc::auth::AuthService;
5
use crate::grpc::health::HealthService;
6
use crate::grpc::{common, typed};
7
use crate::{errors::GrpcError, CacheEndpoint};
8
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
9
use dozer_types::grpc_types::types::Operation;
10
use dozer_types::grpc_types::{
11
    auth::auth_grpc_service_server::AuthGrpcServiceServer,
12
    common::common_grpc_service_server::CommonGrpcServiceServer,
13
    health::health_grpc_service_server::HealthGrpcServiceServer,
14
};
15
use dozer_types::tracing::Level;
16
use dozer_types::{
17
    log::info,
18
    models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
19
};
20
use futures_util::stream::{AbortHandle, Abortable, Aborted};
21
use futures_util::Future;
22
use std::{collections::HashMap, sync::Arc};
23
use tokio::sync::broadcast::{self, Receiver};
24
use tonic::transport::Server;
25
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
26
use tower::Layer;
27
use tower_http::trace::{self, TraceLayer};
28

29
pub struct ApiServer {
30
    port: u16,
31
    host: String,
32
    security: Option<ApiSecurity>,
33
    flags: Flags,
34
}
35

36
impl ApiServer {
×
37
    fn get_dynamic_service(
9✔
38
        &self,
9✔
39
        cache_endpoints: Vec<Arc<CacheEndpoint>>,
9✔
40
        operations_receiver: Option<broadcast::Receiver<Operation>>,
9✔
41
    ) -> Result<
9✔
42
        (
9✔
43
            Option<TypedService>,
9✔
44
            ServerReflectionServer<impl ServerReflection>,
9✔
45
        ),
9✔
46
        ApiInitError,
9✔
47
    > {
9✔
48
        let mut all_descriptor_bytes = vec![];
9✔
49
        for cache_endpoint in &cache_endpoints {
18✔
50
            all_descriptor_bytes.push(cache_endpoint.descriptor().to_vec());
9✔
51
        }
9✔
52

×
53
        let mut builder = tonic_reflection::server::Builder::configure();
9✔
54
        for descriptor_bytes in &all_descriptor_bytes {
18✔
55
            builder = builder.register_encoded_file_descriptor_set(descriptor_bytes);
9✔
56
        }
9✔
57
        let inflection_service = builder.build().map_err(GrpcError::ServerReflectionError)?;
9✔
58

59
        // Service handling dynamic gRPC requests.
×
60
        let typed_service = if self.flags.dynamic {
9✔
61
            Some(TypedService::new(
9✔
62
                cache_endpoints,
9✔
63
                operations_receiver,
9✔
64
                self.security.clone(),
9✔
65
            )?)
9✔
66
        } else {
×
67
            None
×
68
        };
69

×
70
        Ok((typed_service, inflection_service))
9✔
71
    }
9✔
72

×
73
    pub fn new(grpc_config: GrpcApiOptions, security: Option<ApiSecurity>, flags: Flags) -> Self {
9✔
74
        Self {
9✔
75
            port: grpc_config.port as u16,
9✔
76
            host: grpc_config.host,
9✔
77
            security,
9✔
78
            flags,
9✔
79
        }
9✔
80
    }
9✔
81

×
82
    pub async fn run(
6✔
83
        &self,
6✔
84
        cache_endpoints: Vec<Arc<CacheEndpoint>>,
6✔
85
        shutdown: impl Future<Output = ()> + Send + 'static,
6✔
86
        operations_receiver: Option<Receiver<Operation>>,
6✔
87
    ) -> Result<(), ApiInitError> {
6✔
88
        // Create our services.
6✔
89
        let mut web_config = tonic_web::config();
6✔
90
        if self.flags.grpc_web {
6✔
91
            web_config = web_config.allow_all_origins();
6✔
92
        }
6✔
93

×
94
        let common_service = CommonGrpcServiceServer::new(CommonService::new(
6✔
95
            cache_endpoints.clone(),
6✔
96
            operations_receiver.as_ref().map(|r| r.resubscribe()),
6✔
97
        ));
6✔
98
        let common_service = web_config.enable(common_service);
6✔
99

×
100
        let (typed_service, reflection_service) =
6✔
101
            self.get_dynamic_service(cache_endpoints, operations_receiver)?;
6✔
102
        let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
6✔
103
        let reflection_service = web_config.enable(reflection_service);
6✔
104

6✔
105
        let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
6✔
106
        service_map.insert("".to_string(), ServingStatus::Serving);
6✔
107
        service_map.insert(common::SERVICE_NAME.to_string(), ServingStatus::Serving);
6✔
108
        if typed_service.is_some() {
6✔
109
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::Serving);
6✔
110
        } else {
6✔
111
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::NotServing);
×
112
        }
×
113
        let health_service = HealthGrpcServiceServer::new(HealthService {
6✔
114
            serving_status: service_map,
6✔
115
        });
6✔
116
        let health_service = web_config.enable(health_service);
6✔
117

6✔
118
        // Auth middleware.
6✔
119
        let auth_middleware = AuthMiddlewareLayer::new(self.security.clone());
6✔
120

6✔
121
        // Authenticated services.
6✔
122
        let common_service = auth_middleware.layer(common_service);
6✔
123
        let typed_service = typed_service.map(|typed_service| auth_middleware.layer(typed_service));
6✔
124
        let mut authenticated_reflection_service = None;
6✔
125
        let mut unauthenticated_reflection_service = None;
6✔
126
        if self.flags.authenticate_server_reflection {
6✔
127
            authenticated_reflection_service = Some(auth_middleware.layer(reflection_service))
×
128
        } else {
6✔
129
            unauthenticated_reflection_service = Some(reflection_service);
6✔
130
        };
6✔
131
        let health_service = auth_middleware.layer(health_service);
6✔
132

6✔
133
        let mut auth_service = None;
6✔
134
        if self.security.is_some() {
6✔
135
            let service = web_config.enable(AuthGrpcServiceServer::new(AuthService::new(
×
136
                self.security.clone(),
×
137
            )));
×
138
            auth_service = Some(auth_middleware.layer(service));
×
139
        }
6✔
140
        let metric_middleware = MetricMiddlewareLayer::new();
6✔
141
        // Add services to server.
6✔
142
        let mut grpc_router = Server::builder()
6✔
143
            .layer(
6✔
144
                TraceLayer::new_for_http()
6✔
145
                    .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
6✔
146
                    .on_response(trace::DefaultOnResponse::new().level(Level::INFO))
6✔
147
                    .on_failure(trace::DefaultOnFailure::new().level(Level::ERROR)),
6✔
148
            )
6✔
149
            .layer(metric_middleware)
6✔
150
            .accept_http1(true)
6✔
151
            .concurrency_limit_per_connection(32)
6✔
152
            .add_service(common_service)
6✔
153
            .add_optional_service(typed_service);
6✔
154

×
155
        if let Some(reflection_service) = authenticated_reflection_service {
6✔
156
            grpc_router = grpc_router.add_service(reflection_service);
×
157
        }
6✔
158
        if let Some(reflection_service) = unauthenticated_reflection_service {
6✔
159
            grpc_router = grpc_router.add_service(reflection_service);
6✔
160
        }
6✔
161

×
162
        grpc_router = grpc_router.add_service(health_service);
6✔
163
        grpc_router = grpc_router.add_optional_service(auth_service);
6✔
164

6✔
165
        // Tonic graceful shutdown doesn't allow us to set a timeout, resulting in hanging if a client doesn't close the connection.
6✔
166
        // So we just abort the server when the shutdown signal is received.
6✔
167
        let (abort_handle, abort_registration) = AbortHandle::new_pair();
6✔
168
        tokio::spawn(async move {
6✔
169
            shutdown.await;
6✔
170
            abort_handle.abort();
6✔
171
        });
6✔
172

6✔
173
        // Run server.
6✔
174
        let addr = format!("{}:{}", self.host, self.port);
6✔
175
        info!(
×
176
            "Starting gRPC server on {addr} with security: {}",
6✔
177
            self.security
6✔
178
                .as_ref()
6✔
179
                .map_or("None".to_string(), |s| match s {
6✔
180
                    ApiSecurity::Jwt(_) => "JWT".to_string(),
×
181
                })
6✔
182
        );
183

×
184
        let addr = addr
6✔
185
            .parse()
6✔
186
            .map_err(|e| GrpcError::AddrParse(addr.clone(), e))?;
6✔
187
        match Abortable::new(grpc_router.serve(addr), abort_registration).await {
12✔
188
            Ok(result) => result.map_err(|e| ApiInitError::Grpc(GrpcError::Transport(e))),
×
189
            Err(Aborted) => Ok(()),
6✔
190
        }
×
191
    }
6✔
192
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc