• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.31
/dozer-log/src/reader.rs
1
use crate::errors::ReaderBuilderError;
2
use crate::replication::LogOperation;
3
use crate::schemas::EndpointSchema;
4
use crate::storage::{LocalStorage, S3Storage, Storage};
5

6
use super::errors::ReaderError;
7
use dozer_types::grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient;
8
use dozer_types::grpc_types::internal::{
9
    storage_response, BuildRequest, LogRequest, LogResponse, StorageRequest,
10
};
11
use dozer_types::log::{debug, error};
12
use dozer_types::models::api_endpoint::{
13
    default_log_reader_batch_size, default_log_reader_buffer_size,
14
    default_log_reader_timeout_in_millis,
15
};
16
use dozer_types::tonic::transport::Channel;
17
use dozer_types::tonic::Streaming;
18
use dozer_types::{bincode, serde_json};
19
use tokio::sync::mpsc::{Receiver, Sender};
20
use tokio::task::JoinHandle;
21
use tokio_stream::wrappers::ReceiverStream;
22
use tokio_stream::StreamExt;
23

24
#[derive(Debug)]
×
25
pub struct LogReaderOptions {
26
    pub endpoint: String,
27
    pub batch_size: u32,
28
    pub timeout_in_millis: u32,
29
    pub buffer_size: u32,
30
}
31

32
impl LogReaderOptions {
33
    pub fn new(endpoint: String) -> Self {
×
34
        Self {
×
35
            endpoint,
×
36
            batch_size: default_log_reader_batch_size(),
×
37
            timeout_in_millis: default_log_reader_timeout_in_millis(),
×
38
            buffer_size: default_log_reader_buffer_size(),
×
39
        }
×
40
    }
×
41
}
42

43
#[derive(Debug)]
×
44
pub struct LogReaderBuilder {
45
    /// Log server runs on a specific build of the endpoint. This is the name of the build.
46
    pub build_name: String,
47
    /// Schema of this endpoint.
48
    pub schema: EndpointSchema,
49
    /// Protobuf descriptor of this endpoint's API.
50
    pub descriptor: Vec<u8>,
51
    pub options: LogReaderOptions,
52
    client: LogClient,
53
}
54

55
pub struct LogReader {
56
    /// Log server runs on a specific build of the endpoint. This is the name of the build.
57
    pub build_name: String,
58
    /// Schema of this endpoint.
59
    pub schema: EndpointSchema,
60
    /// Protobuf descriptor of this endpoint's API.
61
    pub descriptor: Vec<u8>,
62
    op_receiver: Receiver<OpAndPos>,
63
    worker: Option<JoinHandle<Result<(), ReaderError>>>,
64
}
65

66
impl LogReaderBuilder {
67
    pub async fn new(
182✔
68
        server_addr: String,
182✔
69
        options: LogReaderOptions,
182✔
70
    ) -> Result<Self, ReaderBuilderError> {
182✔
71
        let mut client = Self::get_client(server_addr).await?;
264✔
72
        let build = client
84✔
73
            .describe_build(BuildRequest {
84✔
74
                endpoint: options.endpoint.clone(),
84✔
75
            })
84✔
76
            .await?
168✔
77
            .into_inner();
84✔
78
        let build_name = build.name;
84✔
79
        let schema = serde_json::from_str(&build.schema_string)?;
84✔
80

81
        let client = LogClient::new(client, options.endpoint.clone()).await?;
420✔
82

83
        Ok(Self {
84✔
84
            build_name,
84✔
85
            schema,
84✔
86
            descriptor: build.descriptor_bytes,
84✔
87
            client,
84✔
88
            options,
84✔
89
        })
84✔
90
    }
84✔
91

92
    pub async fn get_client(
182✔
93
        server_addr: String,
182✔
94
    ) -> Result<InternalPipelineServiceClient<Channel>, ReaderBuilderError> {
182✔
95
        let client = InternalPipelineServiceClient::connect(server_addr).await?;
264✔
96
        Ok(client)
84✔
97
    }
84✔
98

99
    pub fn build(self, start: u64) -> LogReader {
117✔
100
        let LogReaderBuilder {
117✔
101
            build_name,
117✔
102
            schema,
117✔
103
            descriptor,
117✔
104
            client,
117✔
105
            options,
117✔
106
        } = self;
117✔
107

117✔
108
        let (op_sender, op_receiver) = tokio::sync::mpsc::channel(options.buffer_size as usize);
117✔
109
        let worker = tokio::spawn(log_reader_worker(client, start, options, op_sender));
117✔
110
        LogReader {
117✔
111
            build_name,
117✔
112
            schema,
117✔
113
            descriptor,
117✔
114
            op_receiver,
117✔
115
            worker: Some(worker),
117✔
116
        }
117✔
117
    }
117✔
118
}
119

120
/// An `LogOperation` and its position in the log.
121
pub struct OpAndPos {
122
    pub op: LogOperation,
123
    pub pos: u64,
124
}
125

126
impl LogReader {
127
    pub async fn read_one(&mut self) -> Result<OpAndPos, ReaderError> {
533✔
128
        if let Some(result) = self.op_receiver.recv().await {
246✔
129
            Ok(result)
192✔
130
        } else if let Some(worker) = self.worker.take() {
×
131
            match worker.await {
×
132
                Ok(Ok(())) => {
133
                    panic!("Worker never quit without an error because we're holding the receiver")
×
134
                }
135
                Ok(Err(e)) => Err(e),
×
136
                Err(e) => Err(ReaderError::ReaderThreadQuit(Some(e))),
×
137
            }
138
        } else {
139
            Err(ReaderError::ReaderThreadQuit(None))
×
140
        }
141
    }
192✔
142
}
143

144
#[derive(Debug)]
×
145
struct LogClient {
146
    client: InternalPipelineServiceClient<Channel>,
147
    request_sender: Sender<LogRequest>,
148
    response_stream: Streaming<LogResponse>,
149
    storage: Box<dyn Storage>,
150
}
151

152
impl LogClient {
153
    async fn new(
182✔
154
        mut client: InternalPipelineServiceClient<Channel>,
182✔
155
        endpoint: String,
182✔
156
    ) -> Result<Self, ReaderBuilderError> {
182✔
157
        let storage = client
84✔
158
            .describe_storage(StorageRequest { endpoint })
84✔
159
            .await?
168✔
160
            .into_inner();
84✔
161
        let storage: Box<dyn Storage> = match storage.storage.expect("Must not be None") {
84✔
162
            storage_response::Storage::S3(s3) => {
×
163
                Box::new(S3Storage::new(s3.region.as_str().into(), s3.bucket_name).await?)
×
164
            }
165
            storage_response::Storage::Local(local) => {
84✔
166
                Box::new(LocalStorage::new(local.root).await?)
84✔
167
            }
168
        };
169

170
        let (request_sender, response_stream) = create_get_log_stream(&mut client).await?;
168✔
171

172
        Ok(Self {
84✔
173
            client,
84✔
174
            request_sender,
84✔
175
            response_stream,
84✔
176
            storage,
84✔
177
        })
84✔
178
    }
84✔
179

180
    async fn get_log(&mut self, request: LogRequest) -> Result<Vec<LogOperation>, ReaderError> {
312✔
181
        // Send the request.
182
        let response = loop {
195✔
183
            match call_get_log_once(
312✔
184
                &self.request_sender,
312✔
185
                request.clone(),
312✔
186
                &mut self.response_stream,
312✔
187
            )
312✔
188
            .await
221✔
189
            {
190
                Ok(response) => break response,
195✔
191
                Err(e) => {
26✔
192
                    error!("Error getting log: {:?}", e);
26✔
193
                    (self.request_sender, self.response_stream) = loop {
×
194
                        match create_get_log_stream(&mut self.client).await {
26✔
195
                            Ok((request_sender, response_stream)) => {
×
196
                                break (request_sender, response_stream)
×
197
                            }
198
                            Err(e) => {
26✔
199
                                const RETRY_INTERVAL: std::time::Duration =
200
                                    std::time::Duration::from_secs(5);
201
                                error!(
202
                                    "Error creating log stream: {e}, retrying after {RETRY_INTERVAL:?}..."
26✔
203
                                );
204
                                tokio::time::sleep(RETRY_INTERVAL).await;
26✔
205
                            }
206
                        }
207
                    }
208
                }
209
            }
210
        };
211
        use crate::replication::LogResponse;
212
        let response: LogResponse =
195✔
213
            bincode::deserialize(&response.data).map_err(ReaderError::DeserializeLogResponse)?;
195✔
214

215
        // Load response.
216
        let request_range = request.start..request.end;
195✔
217
        match response {
195✔
218
            LogResponse::Persisted(persisted) => {
×
219
                debug!(
220
                    "Loading persisted log entry {}, entry range {:?}, requested range {:?}",
×
221
                    persisted.key, persisted.range, request_range
222
                );
223
                // Load the persisted log entry.
224
                let data = self.storage.download_object(persisted.key).await?;
×
225
                let mut ops: Vec<LogOperation> =
×
226
                    bincode::deserialize(&data).map_err(ReaderError::DeserializeLogEntry)?;
×
227
                // Discard the ops that are before the requested range.
228
                ops.drain(..request_range.start as usize - persisted.range.start);
×
229
                Ok(ops)
×
230
            }
231
            LogResponse::Operations(ops) => {
195✔
232
                debug!(
233
                    "Got {} ops for request range {:?}",
×
234
                    ops.len(),
×
235
                    request_range
236
                );
237
                Ok(ops)
195✔
238
            }
239
        }
240
    }
195✔
241
}
242

243
async fn create_get_log_stream(
208✔
244
    client: &mut InternalPipelineServiceClient<Channel>,
208✔
245
) -> Result<(Sender<LogRequest>, Streaming<LogResponse>), dozer_types::tonic::Status> {
208✔
246
    let (request_sender, request_receiver) = tokio::sync::mpsc::channel::<LogRequest>(1);
208✔
247
    let request_stream = ReceiverStream::new(request_receiver);
208✔
248
    let response_stream = client.get_log(request_stream).await?.into_inner();
390✔
249
    Ok((request_sender, response_stream))
182✔
250
}
208✔
251

252
async fn call_get_log_once(
312✔
253
    request_sender: &Sender<LogRequest>,
312✔
254
    request: LogRequest,
312✔
255
    stream: &mut Streaming<LogResponse>,
312✔
256
) -> Result<LogResponse, Option<dozer_types::tonic::Status>> {
312✔
257
    if request_sender.send(request).await.is_err() {
312✔
258
        return Err(None);
×
259
    }
312✔
260
    match stream.next().await {
312✔
261
        Some(Ok(response)) => Ok(response),
195✔
262
        Some(Err(e)) => Err(Some(e)),
26✔
263
        None => Err(None),
×
264
    }
265
}
221✔
266

267
async fn log_reader_worker(
117✔
268
    mut log_client: LogClient,
117✔
269
    mut pos: u64,
117✔
270
    options: LogReaderOptions,
117✔
271
    op_sender: Sender<OpAndPos>,
117✔
272
) -> Result<(), ReaderError> {
117✔
273
    loop {
312✔
274
        // Request ops.
312✔
275
        let request = LogRequest {
312✔
276
            endpoint: options.endpoint.clone(),
312✔
277
            start: pos,
312✔
278
            end: pos + options.batch_size as u64,
312✔
279
            timeout_in_millis: options.timeout_in_millis,
312✔
280
        };
312✔
281
        let ops = log_client.get_log(request).await?;
312✔
282

283
        for op in ops {
611✔
284
            if op_sender.send(OpAndPos { op, pos }).await.is_err() {
416✔
285
                debug!("Log reader thread quit because LogReader was dropped");
×
286
                return Ok(());
×
287
            }
416✔
288
            pos += 1;
416✔
289
        }
290
    }
291
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc