• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283045331

pending completion
4283045331

push

github

GitHub
feat: Support timestamp diff (#1074)

58 of 58 new or added lines in 2 files covered. (100.0%)

27146 of 37535 relevant lines covered (72.32%)

33460.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.17
/dozer-api/src/grpc/client_server.rs
1
use super::{auth_middleware::AuthMiddlewareLayer, common::CommonService, typed::TypedService};
2
use crate::grpc::health::HealthService;
3
use crate::grpc::{common, typed};
4
use crate::{errors::GRPCError, generator::protoc::generator::ProtoGenerator, RoCacheEndpoint};
5
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
6
use dozer_types::grpc_types::{
7
    common::common_grpc_service_server::CommonGrpcServiceServer,
8
    health::health_grpc_service_server::HealthGrpcServiceServer,
9
    internal::{
10
        internal_pipeline_service_client::InternalPipelineServiceClient, PipelineRequest,
11
        PipelineResponse,
12
    },
13
};
14
use dozer_types::tracing::Level;
15
use dozer_types::{
16
    log::{info, warn},
17
    models::{api_config::GrpcApiOptions, api_security::ApiSecurity, flags::Flags},
18
};
19
use futures_util::{FutureExt, StreamExt};
20
use std::{collections::HashMap, path::PathBuf, sync::Arc};
21
use tokio::sync::broadcast::{self, Receiver, Sender};
22
use tonic::{transport::Server, Streaming};
23
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
24
use tower::Layer;
25
use tower_http::trace::{self, TraceLayer};
26

27
pub struct ApiServer {
28
    port: u16,
29
    host: String,
30
    api_dir: PathBuf,
31
    security: Option<ApiSecurity>,
32
    flags: Flags,
33
}
34

35
impl ApiServer {
36
    async fn connect_internal_client(
8✔
37
        app_grpc_config: GrpcApiOptions,
8✔
38
    ) -> Result<Streaming<PipelineResponse>, GRPCError> {
8✔
39
        let address = format!("http://{:}:{:}", app_grpc_config.host, app_grpc_config.port);
8✔
40
        let mut client = InternalPipelineServiceClient::connect(address)
8✔
41
            .await
16✔
42
            .map_err(|err| GRPCError::InternalError(Box::new(err)))?;
8✔
43
        let stream_response = client
8✔
44
            .stream_pipeline_request(PipelineRequest {})
8✔
45
            .await
16✔
46
            .map_err(|err| GRPCError::InternalError(Box::new(err)))?;
8✔
47
        let stream: Streaming<PipelineResponse> = stream_response.into_inner();
8✔
48
        Ok(stream)
8✔
49
    }
8✔
50
    fn get_dynamic_service(
5✔
51
        &self,
5✔
52
        cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
5✔
53
        rx1: Option<broadcast::Receiver<PipelineResponse>>,
5✔
54
    ) -> Result<
5✔
55
        (
5✔
56
            Option<TypedService>,
5✔
57
            ServerReflectionServer<impl ServerReflection>,
5✔
58
        ),
5✔
59
        GRPCError,
5✔
60
    > {
5✔
61
        info!(
5✔
62
            "Starting gRPC server on http://{}:{} with security: {}",
5✔
63
            self.host,
5✔
64
            self.port,
5✔
65
            self.security
5✔
66
                .as_ref()
5✔
67
                .map_or("None".to_string(), |s| match s {
5✔
68
                    ApiSecurity::Jwt(_) => "JWT".to_string(),
×
69
                })
5✔
70
        );
71

72
        let descriptor_path = ProtoGenerator::descriptor_path(&self.api_dir);
5✔
73

74
        let descriptor_bytes = ProtoGenerator::read_descriptor_bytes(&descriptor_path)?;
5✔
75

76
        let inflection_service = tonic_reflection::server::Builder::configure()
5✔
77
            .register_encoded_file_descriptor_set(&descriptor_bytes)
5✔
78
            .build()?;
5✔
79

80
        // Service handling dynamic gRPC requests.
81
        let typed_service = if self.flags.dynamic {
5✔
82
            Some(TypedService::new(
5✔
83
                &descriptor_path,
5✔
84
                cache_endpoints,
5✔
85
                rx1.map(|r| r.resubscribe()),
5✔
86
                self.security.clone(),
5✔
87
            )?)
5✔
88
        } else {
89
            None
×
90
        };
91

92
        Ok((typed_service, inflection_service))
5✔
93
    }
5✔
94

95
    pub fn new(
5✔
96
        grpc_config: GrpcApiOptions,
5✔
97
        api_dir: PathBuf,
5✔
98
        security: Option<ApiSecurity>,
5✔
99
        flags: Flags,
5✔
100
    ) -> Self {
5✔
101
        Self {
5✔
102
            port: grpc_config.port as u16,
5✔
103
            host: grpc_config.host,
5✔
104
            api_dir,
5✔
105
            security,
5✔
106
            flags,
5✔
107
        }
5✔
108
    }
5✔
109

110
    pub async fn run(
5✔
111
        &self,
5✔
112
        cache_endpoints: Vec<Arc<RoCacheEndpoint>>,
5✔
113
        receiver_shutdown: tokio::sync::oneshot::Receiver<()>,
5✔
114
        rx1: Option<Receiver<PipelineResponse>>,
5✔
115
    ) -> Result<(), GRPCError> {
5✔
116
        // Create our services.
4✔
117
        let mut web_config = tonic_web::config();
4✔
118
        if self.flags.grpc_web {
4✔
119
            web_config = web_config.allow_all_origins();
4✔
120
        }
4✔
121

122
        let common_service = CommonGrpcServiceServer::new(CommonService::new(
4✔
123
            cache_endpoints.clone(),
4✔
124
            rx1.as_ref().map(|r| r.resubscribe()),
4✔
125
        ));
4✔
126
        let common_service = web_config.enable(common_service);
4✔
127

128
        let (typed_service, reflection_service) = self.get_dynamic_service(cache_endpoints, rx1)?;
4✔
129
        let typed_service = typed_service.map(|typed_service| web_config.enable(typed_service));
4✔
130
        let reflection_service = web_config.enable(reflection_service);
4✔
131

4✔
132
        let mut service_map: HashMap<String, ServingStatus> = HashMap::new();
4✔
133
        service_map.insert("".to_string(), ServingStatus::Serving);
4✔
134
        service_map.insert(common::SERVICE_NAME.to_string(), ServingStatus::Serving);
4✔
135
        if typed_service.is_some() {
4✔
136
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::Serving);
4✔
137
        } else {
4✔
138
            service_map.insert(typed::SERVICE_NAME.to_string(), ServingStatus::NotServing);
×
139
        }
×
140
        let health_service = HealthGrpcServiceServer::new(HealthService {
4✔
141
            serving_status: service_map,
4✔
142
        });
4✔
143
        let health_service = web_config.enable(health_service);
4✔
144

4✔
145
        // Auth middleware.
4✔
146
        let auth_middleware = AuthMiddlewareLayer::new(self.security.clone());
4✔
147

4✔
148
        // Authenticated services.
4✔
149
        let common_service = auth_middleware.layer(common_service);
4✔
150
        let typed_service = typed_service.map(|typed_service| auth_middleware.layer(typed_service));
4✔
151
        let mut authenticated_reflection_service = None;
4✔
152
        let mut unauthenticated_reflection_service = None;
4✔
153
        if self.flags.authenticate_server_reflection {
4✔
154
            authenticated_reflection_service = Some(auth_middleware.layer(reflection_service))
×
155
        } else {
4✔
156
            unauthenticated_reflection_service = Some(reflection_service);
4✔
157
        };
4✔
158
        let health_service = auth_middleware.layer(health_service);
4✔
159

4✔
160
        // Add services to server.
4✔
161
        let mut grpc_router = Server::builder()
4✔
162
            .layer(
4✔
163
                TraceLayer::new_for_http()
4✔
164
                    .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
4✔
165
                    .on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
4✔
166
            )
4✔
167
            .accept_http1(true)
4✔
168
            .concurrency_limit_per_connection(32)
4✔
169
            .add_service(common_service)
4✔
170
            .add_optional_service(typed_service);
4✔
171

172
        if let Some(reflection_service) = authenticated_reflection_service {
4✔
173
            grpc_router = grpc_router.add_service(reflection_service);
×
174
        }
4✔
175
        if let Some(reflection_service) = unauthenticated_reflection_service {
4✔
176
            grpc_router = grpc_router.add_service(reflection_service);
4✔
177
        }
4✔
178

179
        grpc_router = grpc_router.add_service(health_service);
4✔
180

4✔
181
        // Run server.
4✔
182
        let addr = format!("{:}:{:}", self.host, self.port).parse().unwrap();
4✔
183
        grpc_router
4✔
184
            .serve_with_shutdown(addr, receiver_shutdown.map(drop))
4✔
185
            .await
4✔
186
            .map_err(|e| {
×
187
                let inner_error: Box<dyn std::error::Error> = e.into();
×
188
                let detail = inner_error.source();
×
189
                if let Some(detail) = detail {
×
190
                    return GRPCError::TransportErrorDetail(detail.to_string());
×
191
                }
×
192
                GRPCError::TransportErrorDetail(inner_error.to_string())
×
193
            })
×
194
    }
×
195

196
    pub async fn setup_broad_cast_channel(
8✔
197
        tx: Sender<PipelineResponse>,
8✔
198
        app_grpc_config: GrpcApiOptions,
8✔
199
    ) -> Result<(), GRPCError> {
8✔
200
        info!(
201
            "Connecting to Internal service  on http://{}:{}",
×
202
            app_grpc_config.host, app_grpc_config.port
203
        );
204
        let mut stream = ApiServer::connect_internal_client(app_grpc_config.to_owned()).await?;
32✔
205
        while let Some(event_response) = stream.next().await {
41✔
206
            if let Ok(event) = event_response {
33✔
207
                let _ = tx.send(event);
31✔
208
            }
31✔
209
        }
210
        warn!("exiting internal grpc connection on api thread");
×
211
        Ok::<(), GRPCError>(())
2✔
212
    }
2✔
213
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc