• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.5
/dozer-api/src/cache_builder/mod.rs
1
use std::collections::HashSet;
2
use std::time::Duration;
3

4
use crate::grpc::types_helper;
5
use dozer_cache::dozer_log::reader::{LogReader, LogReaderBuilder, OpAndPos};
6
use dozer_cache::dozer_log::replication::LogOperation;
7
use dozer_cache::{
8
    cache::{CacheRecord, CacheWriteOptions, RwCache, RwCacheManager, UpsertResult},
9
    errors::CacheError,
10
};
11
use dozer_tracing::{Labels, LabelsAndProgress};
12
use dozer_types::indicatif::ProgressBar;
13
use dozer_types::log::debug;
14
use dozer_types::types::SchemaWithIndex;
15
use dozer_types::{
16
    grpc_types::types::Operation as GrpcOperation,
17
    log::error,
18
    types::{Field, Operation, Record, Schema},
19
};
20
use futures_util::stream::FuturesUnordered;
21
use futures_util::{
22
    future::{select, Either},
23
    Future,
24
};
25
use metrics::{describe_counter, describe_histogram, histogram, increment_counter};
26
use tokio::sync::broadcast::Sender;
27
use tokio::sync::mpsc;
28
use tokio_stream::StreamExt;
29

30
pub async fn build_cache(
30✔
31
    cache: Box<dyn RwCache>,
30✔
32
    cancel: impl Future<Output = ()> + Unpin + Send + 'static,
30✔
33
    log_reader_builder: LogReaderBuilder,
30✔
34
    operations_sender: Option<(String, Sender<GrpcOperation>)>,
30✔
35
    labels: LabelsAndProgress,
30✔
36
) -> Result<(), CacheError> {
30✔
37
    // Create log reader.
38
    let starting_pos = cache.get_metadata()?.map(|pos| pos + 1).unwrap_or(0);
30✔
39
    debug!(
40
        "Starting log reader {} from position {starting_pos}",
×
41
        log_reader_builder.options.endpoint
42
    );
43
    let pb = labels.create_progress_bar(format!("cache: {}", log_reader_builder.options.endpoint));
30✔
44
    pb.set_position(starting_pos);
30✔
45
    let log_reader = log_reader_builder.build(starting_pos);
30✔
46

30✔
47
    // Spawn tasks
30✔
48
    let mut futures = FuturesUnordered::new();
30✔
49
    let (sender, receiver) = mpsc::channel(1);
30✔
50
    futures.push(tokio::spawn(async move {
30✔
51
        read_log_task(cancel, log_reader, sender).await;
140✔
52
        Ok(())
30✔
53
    }));
30✔
54
    futures.push({
30✔
55
        tokio::task::spawn_blocking(|| build_cache_task(cache, receiver, operations_sender, pb))
30✔
56
    });
30✔
57

58
    while let Some(result) = futures.next().await {
90✔
59
        match result {
60✔
60
            Ok(Ok(())) => (),
60✔
61
            Ok(Err(e)) => return Err(e),
×
62
            Err(e) => return Err(CacheError::InternalThreadPanic(e)),
×
63
        }
64
    }
65

66
    Ok(())
30✔
67
}
30✔
68

69
pub fn open_or_create_cache(
36✔
70
    cache_manager: &dyn RwCacheManager,
36✔
71
    labels: Labels,
36✔
72
    schema: SchemaWithIndex,
36✔
73
    connections: &HashSet<String>,
36✔
74
    write_options: CacheWriteOptions,
36✔
75
) -> Result<Box<dyn RwCache>, CacheError> {
36✔
76
    match cache_manager.open_rw_cache(labels.clone(), write_options)? {
36✔
77
        Some(cache) => {
6✔
78
            debug_assert!(cache.get_schema() == &schema);
6✔
79
            Ok(cache)
6✔
80
        }
81
        None => {
82
            let cache = cache_manager.create_cache(
30✔
83
                labels,
30✔
84
                schema.0,
30✔
85
                schema.1,
30✔
86
                connections,
30✔
87
                write_options,
30✔
88
            )?;
30✔
89
            Ok(cache)
30✔
90
        }
91
    }
92
}
36✔
93

94
const READ_LOG_RETRY_INTERVAL: Duration = Duration::from_secs(1);
95

96
async fn read_log_task(
30✔
97
    mut cancel: impl Future<Output = ()> + Unpin + Send + 'static,
30✔
98
    mut log_reader: LogReader,
30✔
99
    sender: mpsc::Sender<OpAndPos>,
30✔
100
) {
30✔
101
    loop {
150✔
102
        let next_op = std::pin::pin!(log_reader.read_one());
150✔
103
        match select(cancel, next_op).await {
150✔
104
            Either::Left(_) => break,
30✔
105
            Either::Right((op, c)) => {
120✔
106
                let op = match op {
120✔
107
                    Ok(op) => op,
120✔
108
                    Err(e) => {
×
109
                        error!(
110
                            "Failed to read log: {e}, retrying after {READ_LOG_RETRY_INTERVAL:?}"
×
111
                        );
112
                        tokio::time::sleep(READ_LOG_RETRY_INTERVAL).await;
×
113
                        cancel = c;
×
114
                        continue;
×
115
                    }
116
                };
117

118
                cancel = c;
120✔
119
                if sender.send(op).await.is_err() {
120✔
120
                    debug!("Stop reading log because receiver is dropped");
×
121
                    break;
×
122
                }
120✔
123
            }
124
        }
125
    }
126
}
30✔
127

128
fn build_cache_task(
36✔
129
    mut cache: Box<dyn RwCache>,
36✔
130
    mut receiver: mpsc::Receiver<OpAndPos>,
36✔
131
    operations_sender: Option<(String, Sender<GrpcOperation>)>,
36✔
132
    progress_bar: ProgressBar,
36✔
133
) -> Result<(), CacheError> {
36✔
134
    let schema = cache.get_schema().0.clone();
36✔
135

36✔
136
    const CACHE_OPERATION_COUNTER_NAME: &str = "cache_operation";
36✔
137
    describe_counter!(
36✔
138
        CACHE_OPERATION_COUNTER_NAME,
×
139
        "Number of message processed by cache builder"
×
140
    );
141

142
    const DATA_LATENCY_HISTOGRAM_NAME: &str = "data_latency";
143
    describe_histogram!(
36✔
144
        DATA_LATENCY_HISTOGRAM_NAME,
×
145
        "End-to-end data latency in seconds"
×
146
    );
147

148
    const OPERATION_TYPE_LABEL: &str = "operation_type";
149
    const SNAPSHOTTING_LABEL: &str = "snapshotting";
150

151
    let mut snapshotting = !cache.is_snapshotting_done()?;
36✔
152

153
    while let Some(op_and_pos) = receiver.blocking_recv() {
180✔
154
        progress_bar.set_position(op_and_pos.pos + 1);
144✔
155
        match op_and_pos.op {
144✔
156
            LogOperation::Op { op } => match op {
66✔
157
                Operation::Delete { old } => {
12✔
158
                    if let Some(meta) = cache.delete(&old)? {
12✔
159
                        if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref()
6✔
160
                        {
6✔
161
                            let operation = types_helper::map_delete_operation(
6✔
162
                                endpoint_name.clone(),
6✔
163
                                CacheRecord::new(meta.id, meta.version, old),
6✔
164
                            );
6✔
165
                            send_and_log_error(operations_sender, operation);
6✔
166
                        }
6✔
167
                    }
6✔
168
                    let mut labels = cache.labels().clone();
12✔
169
                    labels.push(OPERATION_TYPE_LABEL, "delete");
12✔
170
                    labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting));
12✔
171
                    increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
12✔
172
                }
173
                Operation::Insert { new } => {
54✔
174
                    let result = cache.insert(&new)?;
54✔
175
                    let mut labels = cache.labels().clone();
54✔
176
                    labels.push(OPERATION_TYPE_LABEL, "insert");
54✔
177
                    labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting));
54✔
178
                    increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
54✔
179

180
                    if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
54✔
181
                        send_upsert_result(
54✔
182
                            endpoint_name,
54✔
183
                            operations_sender,
54✔
184
                            result,
54✔
185
                            &schema,
54✔
186
                            None,
54✔
187
                            new,
54✔
188
                        );
54✔
189
                    }
54✔
190
                }
191
                Operation::Update { old, new } => {
×
192
                    let upsert_result = cache.update(&old, &new)?;
×
193
                    let mut labels = cache.labels().clone();
×
194
                    labels.push(OPERATION_TYPE_LABEL, "update");
×
195
                    labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting));
×
196
                    increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
×
197

198
                    if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
×
199
                        send_upsert_result(
×
200
                            endpoint_name,
×
201
                            operations_sender,
×
202
                            upsert_result,
×
203
                            &schema,
×
204
                            Some(old),
×
205
                            new,
×
206
                        );
×
207
                    }
×
208
                }
209
            },
210
            LogOperation::Commit { decision_instant } => {
42✔
211
                cache.set_metadata(op_and_pos.pos)?;
42✔
212
                cache.commit()?;
42✔
213
                if let Ok(duration) = decision_instant.elapsed() {
42✔
214
                    histogram!(
42✔
215
                        DATA_LATENCY_HISTOGRAM_NAME,
216
                        duration,
×
217
                        cache.labels().clone()
×
218
                    );
219
                }
×
220
            }
221
            LogOperation::SnapshottingDone { connection_name } => {
36✔
222
                cache.set_metadata(op_and_pos.pos)?;
36✔
223
                cache.set_connection_snapshotting_done(&connection_name)?;
36✔
224
                cache.commit()?;
36✔
225
                snapshotting = !cache.is_snapshotting_done()?;
36✔
226
            }
227
        }
228
    }
229

230
    Ok(())
36✔
231
}
36✔
232

233
fn send_upsert_result(
54✔
234
    endpoint_name: &str,
54✔
235
    operations_sender: &Sender<GrpcOperation>,
54✔
236
    upsert_result: UpsertResult,
54✔
237
    schema: &Schema,
54✔
238
    old: Option<Record>,
54✔
239
    new: Record,
54✔
240
) {
54✔
241
    match upsert_result {
54✔
242
        UpsertResult::Inserted { meta } => {
54✔
243
            let op = types_helper::map_insert_operation(
54✔
244
                endpoint_name.to_string(),
54✔
245
                CacheRecord::new(meta.id, meta.version, new),
54✔
246
            );
54✔
247
            send_and_log_error(operations_sender, op);
54✔
248
        }
54✔
249
        UpsertResult::Updated { old_meta, new_meta } => {
×
250
            // If `old` is `None`, it means `Updated` comes from `Insert` operation.
×
251
            // In this case, we can't get the full old record, but the fields in the primary index must be the same with the new record.
×
252
            // So we create the old record with only the fields in the primary index, cloned from `new`.
×
253
            let old = old.unwrap_or_else(|| {
×
254
                let mut record = Record::new(vec![Field::Null; new.values.len()]);
×
255
                for index in schema.primary_index.iter() {
×
256
                    record.values[*index] = new.values[*index].clone();
×
257
                }
×
258
                record
×
259
            });
×
260
            let op = types_helper::map_update_operation(
×
261
                endpoint_name.to_string(),
×
262
                CacheRecord::new(old_meta.id, old_meta.version, old),
×
263
                CacheRecord::new(new_meta.id, new_meta.version, new),
×
264
            );
×
265
            send_and_log_error(operations_sender, op);
×
266
        }
×
267
        UpsertResult::Ignored => {}
×
268
    }
269
}
54✔
270

271
fn send_and_log_error<T: Send + Sync + 'static>(sender: &Sender<T>, msg: T) {
272
    if let Err(e) = sender.send(msg) {
60✔
273
        error!("Failed to send broadcast message: {}", e);
×
274
    }
60✔
275
}
60✔
276

277
fn snapshotting_str(snapshotting: bool) -> &'static str {
66✔
278
    if snapshotting {
66✔
279
        "true"
54✔
280
    } else {
281
        "false"
12✔
282
    }
283
}
66✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc