• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5724602231

pending completion
5724602231

push

github

web-flow
chore: LogClient automatically retries on network error (#1811)

* chore: `LogClient` automatically retries on network error

* chore: Split `ReaderBuilderError` out from `ReaderError`

55 of 55 new or added lines in 2 files covered. (100.0%)

45530 of 59847 relevant lines covered (76.08%)

38880.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.85
/dozer-log/src/reader.rs
1
use crate::attach_progress;
2
use crate::errors::ReaderBuilderError;
3
use crate::replication::LogOperation;
4
use crate::schemas::BuildSchema;
5
use crate::storage::{LocalStorage, S3Storage, Storage};
6

7
use super::errors::ReaderError;
8
use dozer_types::grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient;
9
use dozer_types::grpc_types::internal::{
10
    storage_response, BuildRequest, LogRequest, LogResponse, StorageRequest,
11
};
12
use dozer_types::indicatif::{MultiProgress, ProgressBar};
13
use dozer_types::log::{debug, error};
14
use dozer_types::models::api_endpoint::{
15
    default_log_reader_batch_size, default_log_reader_buffer_size,
16
    default_log_reader_timeout_in_millis,
17
};
18
use dozer_types::tonic::transport::Channel;
19
use dozer_types::tonic::Streaming;
20
use dozer_types::{bincode, serde_json};
21
use tokio::sync::mpsc::{Receiver, Sender};
22
use tokio::task::JoinHandle;
23
use tokio_stream::wrappers::ReceiverStream;
24
use tokio_stream::StreamExt;
25

×
26
#[derive(Debug)]
×
27
pub struct LogReaderOptions {
28
    pub endpoint: String,
29
    pub batch_size: u32,
30
    pub timeout_in_millis: u32,
31
    pub buffer_size: u32,
32
}
33

34
impl LogReaderOptions {
×
35
    pub fn new(endpoint: String) -> Self {
×
36
        Self {
×
37
            endpoint,
×
38
            batch_size: default_log_reader_batch_size(),
×
39
            timeout_in_millis: default_log_reader_timeout_in_millis(),
×
40
            buffer_size: default_log_reader_buffer_size(),
×
41
        }
×
42
    }
×
43
}
44

×
45
#[derive(Debug)]
×
46
pub struct LogReaderBuilder {
47
    /// Log server runs on a specific build of the endpoint. This is the name of the build.
48
    pub build_name: String,
49
    /// Schema of this endpoint.
50
    pub schema: BuildSchema,
51
    /// Protobuf descriptor of this endpoint's API.
52
    pub descriptor: Vec<u8>,
53
    pub options: LogReaderOptions,
54
    client: LogClient,
55
}
56

57
pub struct LogReader {
58
    /// Log server runs on a specific build of the endpoint. This is the name of the build.
59
    pub build_name: String,
60
    /// Schema of this endpoint.
61
    pub schema: BuildSchema,
62
    /// Protobuf descriptor of this endpoint's API.
63
    pub descriptor: Vec<u8>,
64
    op_receiver: Receiver<(LogOperation, u64)>,
65
    worker: Option<JoinHandle<Result<(), ReaderError>>>,
66
}
67

68
impl LogReaderBuilder {
×
69
    pub async fn new(
18✔
70
        server_addr: String,
18✔
71
        options: LogReaderOptions,
18✔
72
    ) -> Result<Self, ReaderBuilderError> {
18✔
73
        let mut client = InternalPipelineServiceClient::connect(server_addr).await?;
12✔
74
        let build = client
6✔
75
            .describe_build(BuildRequest {
6✔
76
                endpoint: options.endpoint.clone(),
6✔
77
            })
6✔
78
            .await?
12✔
79
            .into_inner();
6✔
80
        let build_name = build.name;
6✔
81
        let schema = serde_json::from_str(&build.schema_string)?;
6✔
82

×
83
        let client = LogClient::new(client, options.endpoint.clone()).await?;
30✔
84

×
85
        Ok(Self {
6✔
86
            build_name,
6✔
87
            schema,
6✔
88
            descriptor: build.descriptor_bytes,
6✔
89
            client,
6✔
90
            options,
6✔
91
        })
6✔
92
    }
6✔
93

×
94
    pub fn build(self, pos: u64, multi_pb: Option<MultiProgress>) -> LogReader {
18✔
95
        let LogReaderBuilder {
18✔
96
            build_name,
18✔
97
            schema,
18✔
98
            descriptor,
18✔
99
            client,
18✔
100
            options,
18✔
101
        } = self;
18✔
102
        let pb = attach_progress(multi_pb);
18✔
103
        pb.set_message(format!("reader: {}", options.endpoint));
18✔
104
        pb.set_position(pos);
18✔
105

18✔
106
        let (op_sender, op_receiver) =
18✔
107
            tokio::sync::mpsc::channel::<(LogOperation, u64)>(options.buffer_size as usize);
18✔
108
        let worker = tokio::spawn(log_reader_worker(client, pos, pb, options, op_sender));
18✔
109
        LogReader {
18✔
110
            build_name,
18✔
111
            schema,
18✔
112
            descriptor,
18✔
113
            op_receiver,
18✔
114
            worker: Some(worker),
18✔
115
        }
18✔
116
    }
18✔
117
}
×
118

×
119
impl LogReader {
×
120
    /// Returns an op and the position of next op.
×
121
    pub async fn next_op(&mut self) -> Result<(LogOperation, u64), ReaderError> {
132✔
122
        if let Some(result) = self.op_receiver.recv().await {
44✔
123
            Ok(result)
38✔
124
        } else if let Some(worker) = self.worker.take() {
×
125
            match worker.await {
×
126
                Ok(Ok(())) => {
×
127
                    panic!("Worker never quit without an error because we're holding the receiver")
×
128
                }
129
                Ok(Err(e)) => Err(e),
×
130
                Err(e) => Err(ReaderError::ReaderThreadQuit(Some(e))),
×
131
            }
×
132
        } else {
133
            Err(ReaderError::ReaderThreadQuit(None))
×
134
        }
×
135
    }
38✔
136
}
137

138
#[derive(Debug)]
×
139
struct LogClient {
140
    client: InternalPipelineServiceClient<Channel>,
141
    request_sender: Sender<LogRequest>,
142
    response_stream: Streaming<LogResponse>,
×
143
    storage: Box<dyn Storage>,
×
144
}
×
145

×
146
impl LogClient {
×
147
    async fn new(
18✔
148
        mut client: InternalPipelineServiceClient<Channel>,
18✔
149
        endpoint: String,
18✔
150
    ) -> Result<Self, ReaderBuilderError> {
18✔
151
        let storage = client
6✔
152
            .describe_storage(StorageRequest { endpoint })
6✔
153
            .await?
12✔
154
            .into_inner();
6✔
155
        let storage: Box<dyn Storage> = match storage.storage.expect("Must not be None") {
6✔
156
            storage_response::Storage::S3(s3) => Box::new(S3Storage::new(s3.bucket_name).await?),
×
157
            storage_response::Storage::Local(local) => {
6✔
158
                Box::new(LocalStorage::new(local.root).await?)
6✔
159
            }
×
160
        };
×
161

×
162
        let (request_sender, response_stream) = create_get_log_stream(&mut client).await?;
12✔
163

×
164
        Ok(Self {
6✔
165
            client,
6✔
166
            request_sender,
6✔
167
            response_stream,
6✔
168
            storage,
6✔
169
        })
6✔
170
    }
6✔
171

×
172
    async fn get_log(&mut self, request: LogRequest) -> Result<Vec<LogOperation>, ReaderError> {
36✔
173
        // Send the request.
×
174
        let response = loop {
18✔
175
            match call_get_log_once(
36✔
176
                &self.request_sender,
36✔
177
                request.clone(),
36✔
178
                &mut self.response_stream,
36✔
179
            )
36✔
180
            .await
18✔
181
            {
×
182
                Ok(response) => break response,
18✔
183
                Err(e) => {
×
184
                    error!("Error getting log: {:?}", e);
×
185
                    (self.request_sender, self.response_stream) = loop {
×
186
                        match create_get_log_stream(&mut self.client).await {
×
187
                            Ok((request_sender, response_stream)) => {
×
188
                                break (request_sender, response_stream)
×
189
                            }
190
                            Err(e) => {
×
191
                                const RETRY_INTERVAL: std::time::Duration =
192
                                    std::time::Duration::from_secs(5);
×
193
                                error!(
×
194
                                    "Error creating log stream: {e}, retrying after {RETRY_INTERVAL:?}..."
×
195
                                );
196
                                tokio::time::sleep(RETRY_INTERVAL).await;
×
197
                            }
×
198
                        }
199
                    }
×
200
                }
201
            }
×
202
        };
×
203
        use crate::replication::LogResponse;
204
        let response: LogResponse =
18✔
205
            bincode::deserialize(&response.data).map_err(ReaderError::DeserializeLogResponse)?;
18✔
206

207
        // Load response.
208
        let request_range = request.start..request.end;
18✔
209
        match response {
18✔
210
            LogResponse::Persisted(persisted) => {
×
211
                debug!(
×
212
                    "Loading persisted log entry {}, entry range {:?}, requested range {:?}",
×
213
                    persisted.key, persisted.range, request_range
×
214
                );
×
215
                // Load the persisted log entry.
×
216
                let data = self.storage.download_object(persisted.key).await?;
×
217
                let mut ops: Vec<LogOperation> =
×
218
                    bincode::deserialize(&data).map_err(ReaderError::DeserializeLogEntry)?;
×
219
                // Discard the ops that are before the requested range.
×
220
                ops.drain(..request_range.start as usize - persisted.range.start);
×
221
                Ok(ops)
×
222
            }
×
223
            LogResponse::Operations(ops) => {
18✔
224
                debug!(
×
225
                    "Got {} ops for request range {:?}",
×
226
                    ops.len(),
×
227
                    request_range
228
                );
×
229
                Ok(ops)
18✔
230
            }
×
231
        }
×
232
    }
18✔
233
}
×
234

×
235
async fn create_get_log_stream(
18✔
236
    client: &mut InternalPipelineServiceClient<Channel>,
18✔
237
) -> Result<(Sender<LogRequest>, Streaming<LogResponse>), dozer_types::tonic::Status> {
18✔
238
    let (request_sender, request_receiver) = tokio::sync::mpsc::channel::<LogRequest>(1);
18✔
239
    let request_stream = ReceiverStream::new(request_receiver);
18✔
240
    let response_stream = client.get_log(request_stream).await?.into_inner();
36✔
241
    Ok((request_sender, response_stream))
18✔
242
}
18✔
243

244
async fn call_get_log_once(
36✔
245
    request_sender: &Sender<LogRequest>,
36✔
246
    request: LogRequest,
36✔
247
    stream: &mut Streaming<LogResponse>,
36✔
248
) -> Result<LogResponse, Option<dozer_types::tonic::Status>> {
36✔
249
    if request_sender.send(request).await.is_err() {
36✔
250
        return Err(None);
×
251
    }
36✔
252
    match stream.next().await {
36✔
253
        Some(Ok(response)) => Ok(response),
18✔
254
        Some(Err(e)) => Err(Some(e)),
×
255
        None => Err(None),
×
256
    }
257
}
18✔
258

259
async fn log_reader_worker(
18✔
260
    mut log_client: LogClient,
18✔
261
    mut pos: u64,
18✔
262
    pb: ProgressBar,
18✔
263
    options: LogReaderOptions,
18✔
264
    op_sender: Sender<(LogOperation, u64)>,
18✔
265
) -> Result<(), ReaderError> {
18✔
266
    loop {
36✔
267
        // Request ops.
36✔
268
        let request = LogRequest {
36✔
269
            endpoint: options.endpoint.clone(),
36✔
270
            start: pos,
36✔
271
            end: pos + options.batch_size as u64,
36✔
272
            timeout_in_millis: options.timeout_in_millis,
36✔
273
        };
36✔
274
        let ops = log_client.get_log(request).await?;
36✔
275

276
        for op in ops {
132✔
277
            pos += 1;
114✔
278
            pb.set_position(pos);
114✔
279
            if op_sender.send((op, pos)).await.is_err() {
114✔
280
                debug!("Log reader thread quit because LogReader was dropped");
×
281
                return Ok(());
×
282
            }
114✔
283
        }
284
    }
285
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc