• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.05
/dozer-api/src/grpc/client_server.rs
1
use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::TypedService};
2
use crate::grpc::health::HealthService;
3
use crate::grpc::{common, typed};
4
use crate::{errors::GrpcError, generator::protoc::generator::ProtoGenerator, RoCacheEndpoint};
5
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
6
use dozer_types::grpc_types::types::Operation;
7
use dozer_types::grpc_types::{
8
    common::common_grpc_service_server::CommonGrpcServiceServer,
9
    health::health_grpc_service_server::HealthGrpcServiceServer,
10
};
11
use dozer_types::tracing::Level;
12
use dozer_types::{
13
    log::info,
14
    models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
15
};
16
use futures_util::FutureExt;
17
use std::{collections::HashMap, path::PathBuf, sync::Arc};
18
use tokio::sync::broadcast::{self, Receiver};
19
use tonic::transport::Server;
20
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
21
use tower::Layer;
22
use tower_http::trace::{self, TraceLayer};
23

24
pub struct ApiServer {
25
    port: u16,
26
    host: String,
27
    api_dir: PathBuf,
28
    security: Option<ApiSecurity>,
29
    flags: Flags,
30
}
31

32
impl ApiServer {
33
    fn get_dynamic_service(
5✔
34
        &self,
5✔
35
        cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
5✔
36
        operations_receiver: Option<broadcast::Receiver<Operation>>,
5✔
37
    ) -> Result<
5✔
38
        (
5✔
39
            Option<TypedService>,
5✔
40
            ServerReflectionServer<impl ServerReflection>,
5✔
41
        ),
5✔
42
        GrpcError,
5✔
43
    > {
5✔
44
        info!(
5✔
45
            "Starting gRPC server on http://{}:{} with security: {}",
5✔
46
            self.host,
5✔
47
            self.port,
5✔
48
            self.security
5✔
49
                .as_ref()
5✔
50
                .map_or("None".to_string(), |s| match s {
5✔
51
                    ApiSecurity::Jwt(_) => "JWT".to_string(),
×
52
                })
5✔
53
        );
×
54

×
55
        let descriptor_path = ProtoGenerator::descriptor_path(&self.api_dir);
5✔
56

×
57
        let descriptor_bytes = ProtoGenerator::read_descriptor_bytes(&descriptor_path)?;
5✔
58

×
59
        let inflection_service = tonic_reflection::server::Builder::configure()
5✔
60
            .register_encoded_file_descriptor_set(&descriptor_bytes)
5✔
61
            .build()?;
5✔
62

×
63
        // Service handling dynamic gRPC requests.
×
64
        let typed_service = if self.flags.dynamic {
5✔
65
            Some(TypedService::new(
5✔
66
                &descriptor_path,
5✔
67
                cache_endpoints,
5✔
68
                operations_receiver,
5✔
69
                self.security.clone(),
5✔
70
            )?)
5✔
71
        } else {
72
            None
×
73
        };
74

×
75
        Ok((typed_service, inflection_service))
5✔
76
    }
5✔
77

×
78
    pub fn new(
5✔
79
        grpc_config: GrpcApiOptions,
5✔
80
        api_dir: PathBuf,
5✔
81
        security: Option<ApiSecurity>,
5✔
82
        flags: Flags,
5✔
83
    ) -> Self {
5✔
84
        Self {
5✔
85
            port: grpc_config.port as u16,
5✔
86
            host: grpc_config.host,
5✔
87
            api_dir,
5✔
88
            security,
5✔
89
            flags,
5✔
90
        }
5✔
91
    }
5✔
92

×
93
    pub async fn run(
5✔
94
        &self,
5✔
95
        cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
5✔
96
        receiver_shutdown: tokio::sync::oneshot::Receiver<()>,
5✔
97
        operations_receiver: Option<Receiver<Operation>>,
5✔
98
    ) -> Result<(), GrpcError> {
5✔
99
        // Create our services.
4✔
100
        let mut web_config = tonic_web::config();
4✔
101
        if self.flags.grpc_web {
4✔
102
            web_config = web_config.allow_all_origins();
4✔
103
        }
4✔
104

×
105
        let common_service = CommonGrpcServiceServer::new(CommonService::new(
4✔
106
            cache_endpoints.clone(),
4✔
107
            operations_receiver.as_ref().map(|r| r.resubscribe()),
4✔
108
        ));
4✔
109
        let common_service = web_config.enable(common_service);
4✔
110

×
111
        let (typed_service, reflection_service) =
4✔
112
            self.get_dynamic_service(cache_endpoints, operations_receiver)?;
4✔
113
        let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
4✔
114
        let reflection_service = web_config.enable(reflection_service);
4✔
115

4✔
116
        let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
4✔
117
        service_map.insert("".to_string(), ServingStatus::Serving);
4✔
118
        service_map.insert(common::SERVICE_NAME.to_string(), ServingStatus::Serving);
4✔
119
        if typed_service.is_some() {
4✔
120
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::Serving);
4✔
121
        } else {
4✔
122
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::NotServing);
×
123
        }
×
124
        let health_service = HealthGrpcServiceServer::new(HealthService {
4✔
125
            serving_status: service_map,
4✔
126
        });
4✔
127
        let health_service = web_config.enable(health_service);
4✔
128

4✔
129
        // Auth middleware.
4✔
130
        let auth_middleware = AuthMiddlewareLayer::new(self.security.clone());
4✔
131

4✔
132
        // Authenticated services.
4✔
133
        let common_service = auth_middleware.layer(common_service);
4✔
134
        let typed_service = typed_service.map(|typed_service| auth_middleware.layer(typed_service));
4✔
135
        let mut authenticated_reflection_service = None;
4✔
136
        let mut unauthenticated_reflection_service = None;
4✔
137
        if self.flags.authenticate_server_reflection {
4✔
138
            authenticated_reflection_service = Some(auth_middleware.layer(reflection_service))
×
139
        } else {
4✔
140
            unauthenticated_reflection_service = Some(reflection_service);
4✔
141
        };
4✔
142
        let health_service = auth_middleware.layer(health_service);
4✔
143

4✔
144
        // Add services to server.
4✔
145
        let mut grpc_router = Server::builder()
4✔
146
            .layer(
4✔
147
                TraceLayer::new_for_http()
4✔
148
                    .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
4✔
149
                    .on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
4✔
150
            )
4✔
151
            .accept_http1(true)
4✔
152
            .concurrency_limit_per_connection(32)
4✔
153
            .add_service(common_service)
4✔
154
            .add_optional_service(typed_service);
4✔
155

×
156
        if let Some(reflection_service) = authenticated_reflection_service {
4✔
157
            grpc_router = grpc_router.add_service(reflection_service);
×
158
        }
4✔
159
        if let Some(reflection_service) = unauthenticated_reflection_service {
4✔
160
            grpc_router = grpc_router.add_service(reflection_service);
4✔
161
        }
4✔
162

×
163
        grpc_router = grpc_router.add_service(health_service);
4✔
164

4✔
165
        // Run server.
4✔
166
        let addr = format!("{:}:{:}", self.host, self.port).parse().unwrap();
4✔
167
        grpc_router
4✔
168
            .serve_with_shutdown(addr, receiver_shutdown.map(drop))
4✔
169
            .await
4✔
170
            .map_err(|e| {
×
171
                let inner_error: Box<dyn std::error::Error> = e.into();
×
172
                let detail = inner_error.source();
×
173
                if let Some(detail) = detail {
×
174
                    return GrpcError::TransportErrorDetail(detail.to_string());
×
175
                }
×
176
                GrpcError::TransportErrorDetail(inner_error.to_string())
×
177
            })
×
178
    }
×
179
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc