• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5923086724

21 Aug 2023 07:05AM UTC coverage: 74.763% (-1.2%) from 75.988%
5923086724

push

github

web-flow
chore: Remove short form of `enable_progress` because it's conflicting with `dozer cloud` (#1876)

46105 of 61668 relevant lines covered (74.76%)

39792.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/dozer-api/src/grpc/internal/internal_pipeline_server.rs
1
use dozer_cache::dozer_log::home_dir::BuildId;
2
use dozer_cache::dozer_log::replication::Log;
3
use dozer_types::bincode;
4
use dozer_types::grpc_types::internal::internal_pipeline_service_server::{
5
    InternalPipelineService, InternalPipelineServiceServer,
6
};
7
use dozer_types::grpc_types::internal::{
8
    BuildRequest, BuildResponse, EndpointResponse, EndpointsResponse, LogRequest, LogResponse,
9
    StorageRequest, StorageResponse,
10
};
11
use dozer_types::log::info;
12
use dozer_types::models::api_config::AppGrpcOptions;
13
use dozer_types::models::api_endpoint::ApiEndpoint;
14
use futures_util::future::Either;
15
use futures_util::stream::BoxStream;
16
use futures_util::{Future, StreamExt, TryStreamExt};
17
use std::collections::HashMap;
18
use std::sync::Arc;
19
use std::time::Duration;
20
use tokio::sync::Mutex;
21
use tonic::transport::server::TcpIncoming;
22
use tonic::transport::Server;
23
use tonic::{Request, Response, Status, Streaming};
24

25
use crate::errors::GrpcError;
26
use crate::grpc::run_server;
27

28
#[derive(Debug, Clone)]
15✔
29
pub struct LogEndpoint {
30
    pub build_id: BuildId,
31
    pub schema_string: String,
32
    pub descriptor_bytes: Vec<u8>,
33
    pub log: Arc<Mutex<Log>>,
34
}
35

36
#[derive(Debug)]
×
37
pub struct InternalPipelineServer {
38
    endpoints: HashMap<String, LogEndpoint>,
39
}
40

41
impl InternalPipelineServer {
42
    pub fn new(endpoints: HashMap<String, LogEndpoint>) -> Self {
9✔
43
        Self { endpoints }
9✔
44
    }
9✔
45
}
46

47
#[tonic::async_trait]
48
impl InternalPipelineService for InternalPipelineServer {
49
    async fn describe_storage(
9✔
50
        &self,
9✔
51
        request: Request<StorageRequest>,
9✔
52
    ) -> Result<Response<StorageResponse>, Status> {
9✔
53
        let endpoint = request.into_inner().endpoint;
9✔
54
        let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log;
9✔
55
        let storage = log.lock().await.describe_storage();
9✔
56
        Ok(Response::new(StorageResponse {
9✔
57
            storage: Some(storage),
9✔
58
        }))
9✔
59
    }
18✔
60

61
    async fn list_endpoints(
×
62
        &self,
×
63
        _request: Request<()>,
×
64
    ) -> Result<Response<EndpointsResponse>, Status> {
×
65
        let endpoints = self
×
66
            .endpoints
×
67
            .iter()
×
68
            .map(|(endpoint, log)| EndpointResponse {
×
69
                endpoint: endpoint.clone(),
×
70
                build_name: log.build_id.name().to_string(),
×
71
            })
×
72
            .collect();
×
73
        Ok(Response::new(EndpointsResponse { endpoints }))
×
74
    }
×
75

76
    async fn describe_build(
9✔
77
        &self,
9✔
78
        request: Request<BuildRequest>,
9✔
79
    ) -> Result<Response<BuildResponse>, Status> {
9✔
80
        let endpoint = request.into_inner().endpoint;
9✔
81
        let endpoint = find_log_endpoint(&self.endpoints, &endpoint)?;
9✔
82
        Ok(Response::new(BuildResponse {
9✔
83
            name: endpoint.build_id.name().to_string(),
9✔
84
            schema_string: endpoint.schema_string.clone(),
9✔
85
            descriptor_bytes: endpoint.descriptor_bytes.clone(),
9✔
86
        }))
9✔
87
    }
18✔
88

89
    type GetLogStream = BoxStream<'static, Result<LogResponse, Status>>;
90

91
    async fn get_log(
9✔
92
        &self,
9✔
93
        requests: Request<Streaming<LogRequest>>,
9✔
94
    ) -> Result<Response<Self::GetLogStream>, Status> {
9✔
95
        let endpoints = self.endpoints.clone();
9✔
96
        Ok(Response::new(
9✔
97
            requests
9✔
98
                .into_inner()
9✔
99
                .and_then(move |request| {
18✔
100
                    let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
18✔
101
                        Ok(log) => log,
18✔
102
                        Err(e) => return Either::Left(std::future::ready(Err(e))),
×
103
                    }
104
                    .log;
105
                    Either::Right(get_log(log.clone(), request))
18✔
106
                })
18✔
107
                .boxed(),
9✔
108
        ))
9✔
109
    }
9✔
110
}
×
111

×
112
fn find_log_endpoint<'a>(
36✔
113
    endpoints: &'a HashMap<String, LogEndpoint>,
36✔
114
    endpoint: &str,
36✔
115
) -> Result<&'a LogEndpoint, Status> {
36✔
116
    endpoints.get(endpoint).ok_or_else(|| {
36✔
117
        Status::new(
×
118
            tonic::Code::NotFound,
×
119
            format!("Endpoint {} not found", endpoint),
×
120
        )
×
121
    })
36✔
122
}
36✔
123

×
124
async fn get_log(log: Arc<Mutex<Log>>, request: LogRequest) -> Result<LogResponse, Status> {
18✔
125
    let mut log_mut = log.lock().await;
18✔
126
    let response = log_mut.read(
18✔
127
        request.start as usize..request.end as usize,
18✔
128
        Duration::from_millis(request.timeout_in_millis as u64),
18✔
129
        log.clone(),
18✔
130
    );
18✔
131
    // Must drop log before awaiting response, otherwise we will deadlock.
18✔
132
    drop(log_mut);
18✔
133
    let response = response
18✔
134
        .await
9✔
135
        .map_err(|e| Status::new(tonic::Code::Internal, e.to_string()))?;
9✔
136
    let data = bincode::serialize(&response).map_err(|e| {
9✔
137
        Status::new(
×
138
            tonic::Code::Internal,
×
139
            format!("Failed to serialize response: {}", e),
×
140
        )
×
141
    })?;
9✔
142
    Ok(LogResponse { data })
9✔
143
}
9✔
144

145
/// TcpIncoming::new requires a tokio runtime, so we mark this function as async.
×
146
pub async fn start_internal_pipeline_server(
6✔
147
    endpoint_and_logs: Vec<(ApiEndpoint, LogEndpoint)>,
6✔
148
    options: &AppGrpcOptions,
6✔
149
    shutdown: impl Future<Output = ()> + Send + 'static,
6✔
150
) -> Result<impl Future<Output = Result<(), tonic::transport::Error>>, GrpcError> {
6✔
151
    let endpoints = endpoint_and_logs
6✔
152
        .into_iter()
6✔
153
        .map(|(endpoint, log)| (endpoint.name, log))
6✔
154
        .collect();
6✔
155
    let server = InternalPipelineServer::new(endpoints);
6✔
156

6✔
157
    // Start listening.
6✔
158
    let addr = format!("{}:{}", options.host, options.port);
6✔
159
    info!("Starting Internal Server on {addr}");
6✔
160
    let addr = addr
6✔
161
        .parse()
6✔
162
        .map_err(|e| GrpcError::AddrParse(addr.clone(), e))?;
6✔
163
    let incoming = TcpIncoming::new(addr, true, None).map_err(|e| GrpcError::Listen(addr, e))?;
6✔
164

165
    // Run server.
×
166
    let server = Server::builder().add_service(InternalPipelineServiceServer::new(server));
6✔
167
    Ok(run_server(server, incoming, shutdown))
6✔
168
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc