• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5954012408

23 Aug 2023 04:32PM UTC coverage: 75.86% (-0.2%) from 76.088%
5954012408

push

github

web-flow
chore: Move ContractService implementation to Contract (#1899)

* chore: Split `build.rs` to several files

* chore: Remove `serde` from `dozer-cli/Cargo.toml`

* chore: Move `ContractService` implementation to `Contract`

461 of 461 new or added lines in 8 files covered. (100.0%)

46996 of 61951 relevant lines covered (75.86%)

73804.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.05
/dozer-api/src/cache_builder/mod.rs
1
use std::collections::HashSet;
2
use std::time::Duration;
3

4
use crate::grpc::types_helper;
5
use dozer_cache::dozer_log::reader::{LogReader, LogReaderBuilder};
6
use dozer_cache::dozer_log::replication::LogOperation;
7
use dozer_cache::{
8
    cache::{CacheRecord, CacheWriteOptions, RwCache, RwCacheManager, UpsertResult},
9
    errors::CacheError,
10
};
11
use dozer_types::indicatif::MultiProgress;
12
use dozer_types::labels::Labels;
13
use dozer_types::log::debug;
14
use dozer_types::types::SchemaWithIndex;
15
use dozer_types::{
16
    grpc_types::types::Operation as GrpcOperation,
17
    log::error,
18
    types::{Field, Operation, Record, Schema},
19
};
20
use futures_util::stream::FuturesUnordered;
21
use futures_util::{
22
    future::{select, Either},
23
    Future,
24
};
25
use metrics::{describe_counter, describe_histogram, histogram, increment_counter};
26
use tokio::sync::broadcast::Sender;
27
use tokio::sync::mpsc;
28
use tokio_stream::StreamExt;
29

30
pub async fn build_cache(
18✔
31
    cache: Box<dyn RwCache>,
18✔
32
    cancel: impl Future<Output = ()> + Unpin + Send + 'static,
18✔
33
    log_reader_builder: LogReaderBuilder,
18✔
34
    operations_sender: Option<(String, Sender<GrpcOperation>)>,
18✔
35
    multi_pb: Option<MultiProgress>,
18✔
36
) -> Result<(), CacheError> {
18✔
37
    // Create log reader.
38
    let pos = cache.get_metadata()?.unwrap_or(0);
18✔
39
    debug!(
40
        "Starting log reader {} from position {pos}",
×
41
        log_reader_builder.options.endpoint
42
    );
43
    let log_reader = log_reader_builder.build(pos, multi_pb);
18✔
44

18✔
45
    // Spawn tasks
18✔
46
    let mut futures = FuturesUnordered::new();
18✔
47
    let (sender, receiver) = mpsc::channel(1);
18✔
48
    futures.push(tokio::spawn(async move {
18✔
49
        read_log_task(cancel, log_reader, sender).await;
87✔
50
        Ok(())
18✔
51
    }));
18✔
52
    futures.push({
18✔
53
        tokio::task::spawn_blocking(|| build_cache_task(cache, receiver, operations_sender))
18✔
54
    });
18✔
55

56
    while let Some(result) = futures.next().await {
54✔
57
        match result {
36✔
58
            Ok(Ok(())) => (),
36✔
59
            Ok(Err(e)) => return Err(e),
×
60
            Err(e) => return Err(CacheError::InternalThreadPanic(e)),
×
61
        }
62
    }
63

64
    Ok(())
18✔
65
}
18✔
66

67
pub fn open_or_create_cache(
72✔
68
    cache_manager: &dyn RwCacheManager,
72✔
69
    labels: Labels,
72✔
70
    schema: SchemaWithIndex,
72✔
71
    connections: &HashSet<String>,
72✔
72
    write_options: CacheWriteOptions,
72✔
73
) -> Result<Box<dyn RwCache>, CacheError> {
72✔
74
    match cache_manager.open_rw_cache(labels.clone(), write_options)? {
72✔
75
        Some(cache) => {
×
76
            debug_assert!(cache.get_schema() == &schema);
×
77
            Ok(cache)
×
78
        }
79
        None => {
80
            let cache = cache_manager.create_cache(
72✔
81
                labels,
72✔
82
                schema.0,
72✔
83
                schema.1,
72✔
84
                connections,
72✔
85
                write_options,
72✔
86
            )?;
72✔
87
            Ok(cache)
72✔
88
        }
89
    }
90
}
72✔
91

92
const READ_LOG_RETRY_INTERVAL: Duration = Duration::from_secs(1);
93

94
async fn read_log_task(
18✔
95
    mut cancel: impl Future<Output = ()> + Unpin + Send + 'static,
18✔
96
    mut log_reader: LogReader,
18✔
97
    sender: mpsc::Sender<(LogOperation, u64)>,
18✔
98
) {
18✔
99
    loop {
93✔
100
        let next_op = std::pin::pin!(log_reader.next_op());
93✔
101
        match select(cancel, next_op).await {
93✔
102
            Either::Left(_) => break,
18✔
103
            Either::Right((op, c)) => {
75✔
104
                let op = match op {
75✔
105
                    Ok(op) => op,
75✔
106
                    Err(e) => {
×
107
                        error!(
108
                            "Failed to read log: {e}, retrying after {READ_LOG_RETRY_INTERVAL:?}"
×
109
                        );
110
                        tokio::time::sleep(READ_LOG_RETRY_INTERVAL).await;
×
111
                        cancel = c;
×
112
                        continue;
×
113
                    }
114
                };
115

116
                cancel = c;
75✔
117
                if sender.send(op).await.is_err() {
75✔
118
                    debug!("Stop reading log because receiver is dropped");
×
119
                    break;
×
120
                }
75✔
121
            }
122
        }
123
    }
124
}
18✔
125

126
fn build_cache_task(
72✔
127
    mut cache: Box<dyn RwCache>,
72✔
128
    mut receiver: mpsc::Receiver<(LogOperation, u64)>,
72✔
129
    operations_sender: Option<(String, Sender<GrpcOperation>)>,
72✔
130
) -> Result<(), CacheError> {
72✔
131
    let schema = cache.get_schema().0.clone();
72✔
132

72✔
133
    const CACHE_OPERATION_COUNTER_NAME: &str = "cache_operation";
72✔
134
    describe_counter!(
72✔
135
        CACHE_OPERATION_COUNTER_NAME,
×
136
        "Number of message processed by cache builder"
×
137
    );
138

139
    const DATA_LATENCY_HISTOGRAM_NAME: &str = "data_latency";
140
    describe_histogram!(
72✔
141
        DATA_LATENCY_HISTOGRAM_NAME,
×
142
        "End-to-end data latency in seconds"
×
143
    );
144

145
    const OPERATION_TYPE_LABEL: &str = "operation_type";
146
    const SNAPSHOTTING_LABEL: &str = "snapshotting";
147

148
    let mut snapshotting = !cache.is_snapshotting_done()?;
72✔
149

150
    while let Some((op, pos)) = receiver.blocking_recv() {
372✔
151
        match op {
300✔
152
            LogOperation::Op { op } => match op {
132✔
153
                Operation::Delete { old } => {
12✔
154
                    if let Some(meta) = cache.delete(&old)? {
12✔
155
                        if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref()
12✔
156
                        {
12✔
157
                            let operation = types_helper::map_delete_operation(
12✔
158
                                endpoint_name.clone(),
12✔
159
                                CacheRecord::new(meta.id, meta.version, old),
12✔
160
                            );
12✔
161
                            send_and_log_error(operations_sender, operation);
12✔
162
                        }
12✔
163
                    }
×
164
                    let mut labels = cache.labels().clone();
12✔
165
                    labels.push(OPERATION_TYPE_LABEL, "delete");
12✔
166
                    labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting));
12✔
167
                    increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
12✔
168
                }
169
                Operation::Insert { new } => {
120✔
170
                    let result = cache.insert(&new)?;
120✔
171
                    let mut labels = cache.labels().clone();
120✔
172
                    labels.push(OPERATION_TYPE_LABEL, "insert");
120✔
173
                    labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting));
120✔
174
                    increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
120✔
175

176
                    if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
120✔
177
                        send_upsert_result(
120✔
178
                            endpoint_name,
120✔
179
                            operations_sender,
120✔
180
                            result,
120✔
181
                            &schema,
120✔
182
                            None,
120✔
183
                            new,
120✔
184
                        );
120✔
185
                    }
120✔
186
                }
187
                Operation::Update { old, new } => {
×
188
                    let upsert_result = cache.update(&old, &new)?;
×
189
                    let mut labels = cache.labels().clone();
×
190
                    labels.push(OPERATION_TYPE_LABEL, "update");
×
191
                    labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting));
×
192
                    increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
×
193

194
                    if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
×
195
                        send_upsert_result(
×
196
                            endpoint_name,
×
197
                            operations_sender,
×
198
                            upsert_result,
×
199
                            &schema,
×
200
                            Some(old),
×
201
                            new,
×
202
                        );
×
203
                    }
×
204
                }
205
            },
206
            LogOperation::Commit { decision_instant } => {
96✔
207
                cache.set_metadata(pos)?;
96✔
208
                cache.commit()?;
96✔
209
                if let Ok(duration) = decision_instant.elapsed() {
96✔
210
                    histogram!(
96✔
211
                        DATA_LATENCY_HISTOGRAM_NAME,
212
                        duration,
×
213
                        cache.labels().clone()
×
214
                    );
215
                }
×
216
            }
217
            LogOperation::SnapshottingDone { connection_name } => {
72✔
218
                cache.set_metadata(pos)?;
72✔
219
                cache.set_connection_snapshotting_done(&connection_name)?;
72✔
220
                cache.commit()?;
72✔
221
                snapshotting = !cache.is_snapshotting_done()?;
72✔
222
            }
223
            LogOperation::Terminate => {
224
                break;
×
225
            }
226
        }
227
    }
228

229
    Ok(())
72✔
230
}
72✔
231

232
fn send_upsert_result(
120✔
233
    endpoint_name: &str,
120✔
234
    operations_sender: &Sender<GrpcOperation>,
120✔
235
    upsert_result: UpsertResult,
120✔
236
    schema: &Schema,
120✔
237
    old: Option<Record>,
120✔
238
    new: Record,
120✔
239
) {
120✔
240
    match upsert_result {
120✔
241
        UpsertResult::Inserted { meta } => {
120✔
242
            let op = types_helper::map_insert_operation(
120✔
243
                endpoint_name.to_string(),
120✔
244
                CacheRecord::new(meta.id, meta.version, new),
120✔
245
            );
120✔
246
            send_and_log_error(operations_sender, op);
120✔
247
        }
120✔
248
        UpsertResult::Updated { old_meta, new_meta } => {
×
249
            // If `old` is `None`, it means `Updated` comes from `Insert` operation.
×
250
            // In this case, we can't get the full old record, but the fields in the primary index must be the same with the new record.
×
251
            // So we create the old record with only the fields in the primary index, cloned from `new`.
×
252
            let old = old.unwrap_or_else(|| {
×
253
                let mut record = Record::new(vec![Field::Null; new.values.len()]);
×
254
                for index in schema.primary_index.iter() {
×
255
                    record.values[*index] = new.values[*index].clone();
×
256
                }
×
257
                record
×
258
            });
×
259
            let op = types_helper::map_update_operation(
×
260
                endpoint_name.to_string(),
×
261
                CacheRecord::new(old_meta.id, old_meta.version, old),
×
262
                CacheRecord::new(new_meta.id, new_meta.version, new),
×
263
            );
×
264
            send_and_log_error(operations_sender, op);
×
265
        }
×
266
        UpsertResult::Ignored => {}
×
267
    }
268
}
120✔
269

270
fn send_and_log_error<T: Send + Sync + 'static>(sender: &Sender<T>, msg: T) {
271
    if let Err(e) = sender.send(msg) {
132✔
272
        error!("Failed to send broadcast message: {}", e);
×
273
    }
132✔
274
}
132✔
275

276
fn snapshotting_str(snapshotting: bool) -> &'static str {
132✔
277
    if snapshotting {
132✔
278
        "true"
132✔
279
    } else {
280
        "false"
×
281
    }
282
}
132✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc