• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.14
/dozer-api/src/grpc/client_server.rs
1
use super::metric_middleware::MetricMiddlewareLayer;
2
use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::TypedService};
3
use crate::api_helper::get_api_security;
4
use crate::errors::ApiInitError;
5
use crate::grpc::auth::AuthService;
6
use crate::grpc::grpc_web_middleware::enable_grpc_web;
7
use crate::grpc::health::HealthService;
8
use crate::grpc::{common, run_server, typed};
9
use crate::{errors::GrpcError, CacheEndpoint};
10
use dozer_tracing::LabelsAndProgress;
11
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
12
use dozer_types::grpc_types::types::Operation;
13
use dozer_types::grpc_types::{
14
    auth::auth_grpc_service_server::AuthGrpcServiceServer,
15
    common::common_grpc_service_server::CommonGrpcServiceServer,
16
    health::health_grpc_service_server::HealthGrpcServiceServer,
17
};
18
use dozer_types::models::api_config::{default_grpc_port, default_host};
19
use dozer_types::models::flags::default_dynamic;
20
use dozer_types::tonic::transport::server::TcpIncoming;
21
use dozer_types::tonic::transport::Server;
22
use dozer_types::tracing::Level;
23
use dozer_types::{
24
    log::info,
25
    models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
26
};
27
use futures_util::Future;
28
use std::{collections::HashMap, sync::Arc};
29
use tokio::sync::broadcast::{self, Receiver};
30
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
31
use tower::Layer;
32
use tower_http::trace::{self, TraceLayer};
33

34
pub struct ApiServer {
35
    port: u16,
36
    host: String,
37
    security: Option<ApiSecurity>,
38
    flags: Flags,
39
}
40

41
impl ApiServer {
42
    fn get_dynamic_service(
36✔
43
        &self,
36✔
44
        cache_endpoints: Vec<Arc<CacheEndpoint>>,
36✔
45
        operations_receiver: Option<broadcast::Receiver<Operation>>,
36✔
46
        default_max_num_records: usize,
36✔
47
    ) -> Result<
36✔
48
        (
36✔
49
            Option<TypedService>,
36✔
50
            ServerReflectionServer<impl ServerReflection>,
36✔
51
        ),
36✔
52
        ApiInitError,
36✔
53
    > {
36✔
54
        let mut all_descriptor_bytes = vec![];
36✔
55
        for cache_endpoint in &cache_endpoints {
72✔
56
            all_descriptor_bytes.push(cache_endpoint.descriptor().to_vec());
36✔
57
        }
36✔
58

59
        let mut builder = tonic_reflection::server::Builder::configure();
36✔
60
        for descriptor_bytes in &all_descriptor_bytes {
72✔
61
            builder = builder.register_encoded_file_descriptor_set(descriptor_bytes);
36✔
62
        }
36✔
63
        let inflection_service = builder.build().map_err(GrpcError::ServerReflectionError)?;
36✔
64
        let security = get_api_security(self.security.to_owned());
36✔
65

66
        // Service handling dynamic gRPC requests.
67
        let typed_service = if self.flags.dynamic.unwrap_or_else(default_dynamic) {
36✔
68
            Some(TypedService::new(
36✔
69
                cache_endpoints,
36✔
70
                operations_receiver,
36✔
71
                security,
36✔
72
                default_max_num_records,
36✔
73
            )?)
36✔
74
        } else {
75
            None
×
76
        };
77

78
        Ok((typed_service, inflection_service))
36✔
79
    }
36✔
80

81
    pub fn new(grpc_config: GrpcApiOptions, security: Option<ApiSecurity>, flags: Flags) -> Self {
36✔
82
        Self {
36✔
83
            port: grpc_config.port.unwrap_or_else(default_grpc_port),
36✔
84
            host: grpc_config.host.unwrap_or_else(default_host),
36✔
85
            security,
36✔
86
            flags,
36✔
87
        }
36✔
88
    }
36✔
89

90
    /// TcpIncoming::new requires a tokio runtime, so we mark this function as async.
91
    pub async fn run(
30✔
92
        &self,
30✔
93
        cache_endpoints: Vec<Arc<CacheEndpoint>>,
30✔
94
        shutdown: impl Future<Output = ()> + Send + 'static,
30✔
95
        operations_receiver: Option<Receiver<Operation>>,
30✔
96
        labels: LabelsAndProgress,
30✔
97
        default_max_num_records: usize,
30✔
98
    ) -> Result<impl Future<Output = Result<(), dozer_types::tonic::transport::Error>>, ApiInitError>
30✔
99
    {
30✔
100
        let grpc_web = self.flags.grpc_web.unwrap_or(true);
30✔
101
        // Create our services.
30✔
102
        let common_service = CommonGrpcServiceServer::new(CommonService::new(
30✔
103
            cache_endpoints.clone(),
30✔
104
            operations_receiver.as_ref().map(|r| r.resubscribe()),
30✔
105
            default_max_num_records,
30✔
106
        ));
30✔
107
        let common_service = enable_grpc_web(common_service, grpc_web);
30✔
108

109
        let (typed_service, reflection_service) = self.get_dynamic_service(
30✔
110
            cache_endpoints,
30✔
111
            operations_receiver,
30✔
112
            default_max_num_records,
30✔
113
        )?;
30✔
114
        let typed_service =
30✔
115
            typed_service.map(|typed_service| enable_grpc_web(typed_service, grpc_web));
30✔
116
        let reflection_service = enable_grpc_web(reflection_service, grpc_web);
30✔
117

30✔
118
        let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
30✔
119
        service_map.insert("".to_string(), ServingStatus::Serving);
30✔
120
        service_map.insert(common::SERVICE_NAME.to_string(), ServingStatus::Serving);
30✔
121
        if typed_service.is_some() {
30✔
122
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::Serving);
30✔
123
        } else {
30✔
124
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::NotServing);
×
125
        }
×
126
        let health_service = HealthGrpcServiceServer::new(HealthService {
30✔
127
            serving_status: service_map,
30✔
128
        });
30✔
129
        let health_service = enable_grpc_web(health_service, grpc_web);
30✔
130

30✔
131
        // Auth middleware.
30✔
132
        let security = get_api_security(self.security.to_owned());
30✔
133
        let auth_middleware = AuthMiddlewareLayer::new(security.clone());
30✔
134

30✔
135
        // Authenticated services.
30✔
136
        let common_service = auth_middleware.layer(common_service);
30✔
137
        let typed_service = typed_service.map(|typed_service| auth_middleware.layer(typed_service));
30✔
138
        let mut authenticated_reflection_service = None;
30✔
139
        let mut unauthenticated_reflection_service = None;
30✔
140
        if self.flags.authenticate_server_reflection.unwrap_or(false) {
30✔
141
            authenticated_reflection_service = Some(auth_middleware.layer(reflection_service))
×
142
        } else {
30✔
143
            unauthenticated_reflection_service = Some(reflection_service);
30✔
144
        };
30✔
145
        let health_service = health_service;
30✔
146

30✔
147
        let mut auth_service = None;
30✔
148
        let security = get_api_security(self.security.to_owned());
30✔
149
        if security.is_some() {
30✔
150
            let service = enable_grpc_web(
×
151
                AuthGrpcServiceServer::new(AuthService::new(security.to_owned())),
×
152
                grpc_web,
×
153
            );
×
154
            auth_service = Some(auth_middleware.layer(service));
×
155
        }
30✔
156
        let metric_middleware = MetricMiddlewareLayer::new(labels);
30✔
157
        // Add services to server.
30✔
158
        let mut grpc_router = Server::builder()
30✔
159
            .layer(
30✔
160
                TraceLayer::new_for_http()
30✔
161
                    .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
30✔
162
                    .on_response(trace::DefaultOnResponse::new().level(Level::INFO))
30✔
163
                    .on_failure(trace::DefaultOnFailure::new().level(Level::ERROR)),
30✔
164
            )
30✔
165
            .layer(metric_middleware)
30✔
166
            .accept_http1(true)
30✔
167
            .concurrency_limit_per_connection(32)
30✔
168
            .add_service(common_service)
30✔
169
            .add_optional_service(typed_service);
30✔
170

171
        if let Some(reflection_service) = authenticated_reflection_service {
30✔
172
            grpc_router = grpc_router.add_service(reflection_service);
×
173
        }
30✔
174
        if let Some(reflection_service) = unauthenticated_reflection_service {
30✔
175
            grpc_router = grpc_router.add_service(reflection_service);
30✔
176
        }
30✔
177

178
        grpc_router = grpc_router.add_service(health_service);
30✔
179
        grpc_router = grpc_router.add_optional_service(auth_service);
30✔
180

30✔
181
        // Start listening.
30✔
182
        let addr = format!("{}:{}", self.host, self.port);
30✔
183
        info!(
184
            "Starting gRPC server on {addr} with security: {}",
15✔
185
            security.as_ref().map_or("None".to_string(), |s| match s {
15✔
186
                ApiSecurity::Jwt(_) => "JWT".to_string(),
×
187
            })
15✔
188
        );
189
        let addr = addr
30✔
190
            .parse()
30✔
191
            .map_err(|e| GrpcError::AddrParse(addr.clone(), e))?;
30✔
192
        let incoming =
30✔
193
            TcpIncoming::new(addr, true, None).map_err(|e| GrpcError::Listen(addr, e))?;
30✔
194

195
        // Run server.
196
        Ok(run_server(grpc_router, incoming, shutdown))
30✔
197
    }
30✔
198
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc