• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5939715234

22 Aug 2023 01:47PM UTC coverage: 74.755% (-1.3%) from 76.052%
5939715234

push

github

web-flow
chore: Run e2e tests nightly (#1886)

* chore: Run e2e tests nightly

* chore: Run Dozer CI on default runners

46459 of 62148 relevant lines covered (74.76%)

40132.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.59
/dozer-api/src/grpc/internal/internal_pipeline_server.rs
1
use dozer_cache::dozer_log::home_dir::BuildId;
2
use dozer_cache::dozer_log::replication::{Log, LogResponseFuture};
3
use dozer_types::bincode;
4
use dozer_types::grpc_types::internal::internal_pipeline_service_server::{
5
    InternalPipelineService, InternalPipelineServiceServer,
6
};
7
use dozer_types::grpc_types::internal::{
8
    BuildRequest, BuildResponse, EndpointResponse, EndpointsResponse, LogRequest, LogResponse,
9
    StorageRequest, StorageResponse,
10
};
11
use dozer_types::log::info;
12
use dozer_types::models::api_config::AppGrpcOptions;
13
use dozer_types::models::api_endpoint::ApiEndpoint;
14
use dozer_types::parking_lot::Mutex;
15
use futures_util::future::Either;
16
use futures_util::stream::BoxStream;
17
use futures_util::{Future, StreamExt, TryStreamExt};
18
use std::collections::HashMap;
19
use std::sync::Arc;
20
use std::time::Duration;
21
use tonic::transport::server::TcpIncoming;
22
use tonic::transport::Server;
23
use tonic::{Request, Response, Status, Streaming};
24

25
use crate::errors::GrpcError;
26
use crate::grpc::run_server;
27

28
#[derive(Debug, Clone)]
15✔
29
pub struct LogEndpoint {
30
    pub build_id: BuildId,
31
    pub schema_string: String,
32
    pub descriptor_bytes: Vec<u8>,
33
    pub log: Arc<Mutex<Log>>,
34
}
35

36
#[derive(Debug)]
×
37
pub struct InternalPipelineServer {
38
    endpoints: HashMap<String, LogEndpoint>,
39
}
40

41
impl InternalPipelineServer {
42
    pub fn new(endpoints: HashMap<String, LogEndpoint>) -> Self {
9✔
43
        Self { endpoints }
9✔
44
    }
9✔
45
}
46

47
#[tonic::async_trait]
48
impl InternalPipelineService for InternalPipelineServer {
49
    async fn describe_storage(
9✔
50
        &self,
9✔
51
        request: Request<StorageRequest>,
9✔
52
    ) -> Result<Response<StorageResponse>, Status> {
9✔
53
        let endpoint = request.into_inner().endpoint;
9✔
54
        let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log;
9✔
55
        let storage = log.lock().describe_storage();
9✔
56
        Ok(Response::new(StorageResponse {
9✔
57
            storage: Some(storage),
9✔
58
        }))
9✔
59
    }
18✔
60

61
    async fn list_endpoints(
×
62
        &self,
×
63
        _request: Request<()>,
×
64
    ) -> Result<Response<EndpointsResponse>, Status> {
×
65
        let endpoints = self
×
66
            .endpoints
×
67
            .iter()
×
68
            .map(|(endpoint, log)| EndpointResponse {
×
69
                endpoint: endpoint.clone(),
×
70
                build_name: log.build_id.name().to_string(),
×
71
            })
×
72
            .collect();
×
73
        Ok(Response::new(EndpointsResponse { endpoints }))
×
74
    }
×
75

×
76
    async fn describe_build(
9✔
77
        &self,
9✔
78
        request: Request<BuildRequest>,
9✔
79
    ) -> Result<Response<BuildResponse>, Status> {
9✔
80
        let endpoint = request.into_inner().endpoint;
9✔
81
        let endpoint = find_log_endpoint(&self.endpoints, &endpoint)?;
9✔
82
        Ok(Response::new(BuildResponse {
9✔
83
            name: endpoint.build_id.name().to_string(),
9✔
84
            schema_string: endpoint.schema_string.clone(),
9✔
85
            descriptor_bytes: endpoint.descriptor_bytes.clone(),
9✔
86
        }))
9✔
87
    }
18✔
88

89
    type GetLogStream = BoxStream<'static, Result<LogResponse, Status>>;
×
90

×
91
    async fn get_log(
9✔
92
        &self,
9✔
93
        requests: Request<Streaming<LogRequest>>,
9✔
94
    ) -> Result<Response<Self::GetLogStream>, Status> {
9✔
95
        let endpoints = self.endpoints.clone();
9✔
96
        Ok(Response::new(
9✔
97
            requests
9✔
98
                .into_inner()
9✔
99
                .and_then(move |request| {
18✔
100
                    let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
18✔
101
                        Ok(log) => log,
18✔
102
                        Err(e) => return Either::Left(std::future::ready(Err(e))),
×
103
                    }
×
104
                    .log;
×
105

×
106
                    let response = log.lock().read(
18✔
107
                        request.start as usize..request.end as usize,
18✔
108
                        Duration::from_millis(request.timeout_in_millis as u64),
18✔
109
                        log.clone(),
18✔
110
                    );
18✔
111

18✔
112
                    Either::Right(serialize_log_response(response))
18✔
113
                })
18✔
114
                .boxed(),
9✔
115
        ))
9✔
116
    }
9✔
117
}
×
118

×
119
fn find_log_endpoint<'a>(
36✔
120
    endpoints: &'a HashMap<String, LogEndpoint>,
36✔
121
    endpoint: &str,
36✔
122
) -> Result<&'a LogEndpoint, Status> {
36✔
123
    endpoints.get(endpoint).ok_or_else(|| {
36✔
124
        Status::new(
×
125
            tonic::Code::NotFound,
×
126
            format!("Endpoint {} not found", endpoint),
×
127
        )
×
128
    })
36✔
129
}
36✔
130

×
131
async fn serialize_log_response(response: LogResponseFuture) -> Result<LogResponse, Status> {
18✔
132
    let response = response
18✔
133
        .await
9✔
134
        .map_err(|e| Status::new(tonic::Code::Internal, e.to_string()))?;
9✔
135
    let data = bincode::serialize(&response).map_err(|e| {
9✔
136
        Status::new(
×
137
            tonic::Code::Internal,
×
138
            format!("Failed to serialize response: {}", e),
×
139
        )
×
140
    })?;
9✔
141
    Ok(LogResponse { data })
9✔
142
}
9✔
143

×
144
/// TcpIncoming::new requires a tokio runtime, so we mark this function as async.
×
145
pub async fn start_internal_pipeline_server(
6✔
146
    endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
6✔
147
    options: &AppGrpcOptions,
6✔
148
    shutdown: impl Future<Output = ()> + Send + 'static,
6✔
149
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, GrpcError> {
6✔
150
    let endpoints = endpoint_and_logs
6✔
151
        .into_iter()
6✔
152
        .map(|(endpoint, log)| (endpoint.name, log))
6✔
153
        .collect();
6✔
154
    let server = InternalPipelineServer::new(endpoints);
6✔
155

6✔
156
    // Start listening.
6✔
157
    let addr = format!("{}:{}", options.host, options.port);
6✔
158
    info!("Starting Internal Server on {addr}");
6✔
159
    let addr = addr
6✔
160
        .parse()
6✔
161
        .map_err(|e| GrpcError::AddrParse(addr.clone(), e))?;
6✔
162
    let incoming = TcpIncoming::new(addr, true, None).map_err(|e| GrpcError::Listen(addr, e))?;
6✔
163

164
    // Run server.
×
165
    let server = Server::builder().add_service(InternalPipelineServiceServer::new(server));
6✔
166
    Ok(run_server(server, incoming, shutdown))
6✔
167
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc