• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4116183752

pending completion
4116183752

push

github

GitHub
refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync` (#821)

790 of 790 new or added lines in 44 files covered. (100.0%)

23005 of 33842 relevant lines covered (67.98%)

56312.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.88
/dozer-api/src/grpc/client_server.rs
1
use super::{
2
    auth_middleware::AuthMiddlewareLayer,
3
    common::CommonService,
4
    common_grpc::common_grpc_service_server::CommonGrpcServiceServer,
5
    health_grpc::health_grpc_service_server::HealthGrpcServiceServer,
6
    internal_grpc::{
7
        internal_pipeline_service_client::InternalPipelineServiceClient, PipelineRequest,
8
        PipelineResponse,
9
    },
10
    typed::TypedService,
11
};
12
use crate::grpc::health::HealthService;
13
use crate::grpc::health_grpc::health_check_response::ServingStatus;
14
use crate::grpc::{common, typed};
15
use crate::{
16
    errors::GRPCError, generator::protoc::generator::ProtoGenerator, PipelineDetails,
17
    RoCacheEndpoint,
18
};
19
use dozer_types::{
20
    log::{info, warn},
21
    models::{
22
        api_config::{ApiGrpc, ApiPipelineInternal},
23
        api_security::ApiSecurity,
24
        flags::Flags,
25
    },
26
    types::Schema,
27
};
28
use futures_util::{FutureExt, StreamExt};
29
use std::{collections::HashMap, path::PathBuf};
30
use tokio::sync::broadcast::{self, Receiver, Sender};
31
use tonic::{transport::Server, Streaming};
32
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
33
use tower::Layer;
34

35
pub struct ApiServer {
36
    port: u16,
37
    host: String,
38
    api_dir: PathBuf,
39
    security: Option<ApiSecurity>,
40
    flags: Flags,
41
}
42

43
impl ApiServer {
44
    async fn connect_internal_client(
8✔
45
        pipeline_config: ApiPipelineInternal,
8✔
46
    ) -> Result<Streaming<PipelineResponse>, GRPCError> {
8✔
47
        let address = format!("http://{:}:{:}", pipeline_config.host, pipeline_config.port);
8✔
48
        let mut client = InternalPipelineServiceClient::connect(address)
8✔
49
            .await
16✔
50
            .map_err(|err| GRPCError::InternalError(Box::new(err)))?;
8✔
51
        let stream_response = client
8✔
52
            .stream_pipeline_request(PipelineRequest {})
8✔
53
            .await
16✔
54
            .map_err(|err| GRPCError::InternalError(Box::new(err)))?;
8✔
55
        let stream: Streaming<PipelineResponse> = stream_response.into_inner();
8✔
56
        Ok(stream)
8✔
57
    }
8✔
58
    fn get_dynamic_service(
×
59
        &self,
×
60
        pipeline_map: HashMap<String, PipelineDetails>,
×
61
        rx1: Option<broadcast::Receiver<PipelineResponse>>,
×
62
    ) -> Result<
×
63
        (
×
64
            Option<TypedService>,
×
65
            ServerReflectionServer<impl ServerReflection>,
×
66
        ),
×
67
        GRPCError,
×
68
    > {
×
69
        let mut schema_map: HashMap<String, Schema> = HashMap::new();
×
70

71
        for (endpoint_name, details) in &pipeline_map {
×
72
            let cache = details.cache_endpoint.cache.clone();
×
73

74
            let (schema, _) = cache
×
75
                .get_schema_and_indexes_by_name(endpoint_name)
×
76
                .map_err(|e| GRPCError::SchemaNotInitialized(endpoint_name.clone(), e))?;
×
77
            schema_map.insert(endpoint_name.clone(), schema);
×
78
        }
79
        info!(
×
80
            "Starting gRPC server on http://{}:{} with security: {}",
×
81
            self.host,
×
82
            self.port,
×
83
            self.security
×
84
                .as_ref()
×
85
                .map_or("None".to_string(), |s| match s {
×
86
                    ApiSecurity::Jwt(_) => "JWT".to_string(),
×
87
                })
×
88
        );
89

90
        let generated_path = self.api_dir.join("generated");
×
91

92
        let proto_res = ProtoGenerator::read(&generated_path)?;
×
93

94
        let inflection_service = tonic_reflection::server::Builder::configure()
×
95
            .register_encoded_file_descriptor_set(proto_res.descriptor_bytes.as_slice())
×
96
            .build()?;
×
97

98
        // Service handling dynamic gRPC requests.
99
        let typed_service = if self.flags.dynamic {
×
100
            Some(TypedService::new(
×
101
                proto_res.descriptor,
×
102
                pipeline_map,
×
103
                schema_map,
×
104
                rx1.map(|r| r.resubscribe()),
×
105
                self.security.to_owned(),
×
106
            ))
×
107
        } else {
108
            None
×
109
        };
110

111
        Ok((typed_service, inflection_service))
×
112
    }
×
113

114
    pub fn new(
×
115
        grpc_config: ApiGrpc,
×
116
        api_dir: PathBuf,
×
117
        security: Option<ApiSecurity>,
×
118
        flags: Flags,
×
119
    ) -> Self {
×
120
        Self {
×
121
            port: grpc_config.port as u16,
×
122
            host: grpc_config.host,
×
123
            api_dir,
×
124
            security,
×
125
            flags,
×
126
        }
×
127
    }
×
128

129
    pub async fn run(
×
130
        &self,
×
131
        cache_endpoints: Vec<RoCacheEndpoint>,
×
132
        receiver_shutdown: tokio::sync::oneshot::Receiver<()>,
×
133
        rx1: Option<Receiver<PipelineResponse>>,
×
134
    ) -> Result<(), GRPCError> {
×
135
        let mut pipeline_map: HashMap<String, PipelineDetails> = HashMap::new();
×
136
        for ce in cache_endpoints {
×
137
            pipeline_map.insert(
×
138
                ce.endpoint.name.to_owned(),
×
139
                PipelineDetails {
×
140
                    schema_name: ce.endpoint.name.to_owned(),
×
141
                    cache_endpoint: ce.to_owned(),
×
142
                },
×
143
            );
×
144
        }
×
145

146
        // Create our services.
147
        let mut web_config = tonic_web::config();
×
148
        if self.flags.grpc_web {
×
149
            web_config = web_config.allow_all_origins();
×
150
        }
×
151

152
        let common_service = CommonGrpcServiceServer::new(CommonService {
×
153
            pipeline_map: pipeline_map.to_owned(),
×
154
            event_notifier: rx1.as_ref().map(|r| r.resubscribe()),
×
155
        });
×
156
        let common_service = web_config.enable(common_service);
×
157

158
        let (typed_service, reflection_service) = self.get_dynamic_service(pipeline_map, rx1)?;
×
159
        let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
×
160
        let reflection_service = web_config.enable(reflection_service);
×
161

×
162
        let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
×
163
        service_map.insert(common::SERVICE_NAME.to_string(), ServingStatus::Serving);
×
164
        if typed_service.is_some() {
×
165
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::Serving);
×
166
        } else {
×
167
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::NotServing);
×
168
        }
×
169
        let health_service = HealthGrpcServiceServer::new(HealthService {
×
170
            serving_status: service_map,
×
171
        });
×
172
        let health_service = web_config.enable(health_service);
×
173

×
174
        // Auth middleware.
×
175
        let auth_middleware = AuthMiddlewareLayer::new(self.security.clone());
×
176

×
177
        // Authenticated services.
×
178
        let common_service = auth_middleware.layer(common_service);
×
179
        let typed_service = typed_service.map(|typed_service| auth_middleware.layer(typed_service));
×
180
        let mut authenticated_reflection_service = None;
×
181
        let mut unauthenticated_reflection_service = None;
×
182
        if self.flags.authenticate_server_reflection {
×
183
            authenticated_reflection_service = Some(auth_middleware.layer(reflection_service))
×
184
        } else {
×
185
            unauthenticated_reflection_service = Some(reflection_service);
×
186
        };
×
187
        let health_service = auth_middleware.layer(health_service);
×
188

×
189
        // Add services to server.
×
190
        let mut grpc_router = Server::builder()
×
191
            .accept_http1(true)
×
192
            .concurrency_limit_per_connection(32)
×
193
            .add_service(common_service)
×
194
            .add_optional_service(typed_service);
×
195

196
        if let Some(reflection_service) = authenticated_reflection_service {
×
197
            grpc_router = grpc_router.add_service(reflection_service);
×
198
        }
×
199
        if let Some(reflection_service) = unauthenticated_reflection_service {
×
200
            grpc_router = grpc_router.add_service(reflection_service);
×
201
        }
×
202

203
        grpc_router = grpc_router.add_service(health_service);
×
204

×
205
        // Run server.
×
206
        let addr = format!("{:}:{:}", self.host, self.port).parse().unwrap();
×
207
        grpc_router
×
208
            .serve_with_shutdown(addr, receiver_shutdown.map(drop))
×
209
            .await
×
210
            .map_err(|e| {
×
211
                let inner_error: Box<dyn std::error::Error> = e.into();
×
212
                let detail = inner_error.source();
×
213
                if let Some(detail) = detail {
×
214
                    return GRPCError::TransportErrorDetail(detail.to_string());
×
215
                }
×
216
                GRPCError::TransportErrorDetail(inner_error.to_string())
×
217
            })
×
218
    }
×
219

220
    pub async fn setup_broad_cast_channel(
8✔
221
        tx: Sender<PipelineResponse>,
8✔
222
        pipeline_config: ApiPipelineInternal,
8✔
223
    ) -> Result<(), GRPCError> {
8✔
224
        info!(
225
            "Connecting to Internal service  on http://{}:{}",
×
226
            pipeline_config.host, pipeline_config.port
227
        );
228
        let mut stream = ApiServer::connect_internal_client(pipeline_config.to_owned()).await?;
32✔
229
        while let Some(event_response) = stream.next().await {
42✔
230
            if let Ok(event) = event_response {
34✔
231
                let _ = tx.send(event);
31✔
232
            }
31✔
233
        }
234
        warn!("exiting internal grpc connection on api thread");
×
235
        Ok::<(), GRPCError>(())
3✔
236
    }
3✔
237
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc