• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763381855

pending completion
4763381855

Pull #1461

github

GitHub
Merge 50bf72be2 into c58df4a0b
Pull Request #1461: feat: Make secondary index configurable

135 of 135 new or added lines in 6 files covered. (100.0%)

34877 of 44466 relevant lines covered (78.44%)

11367.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.9
/dozer-api/src/cache_builder/mod.rs
1
use std::{path::Path, sync::Arc};
2

3
use crate::grpc::types_helper;
4
use dozer_cache::dozer_log::reader::LogReader;
5
use dozer_cache::errors::IndexError;
6
use dozer_cache::{
7
    cache::{CacheRecord, CacheWriteOptions, RwCache, RwCacheManager, UpsertResult},
8
    errors::CacheError,
9
};
10
use dozer_core::executor::ExecutorOperation;
11
use dozer_types::indicatif::MultiProgress;
12
use dozer_types::models::api_endpoint::{
13
    FullText, SecondaryIndex, SecondaryIndexConfig, SortedInverted,
14
};
15
use dozer_types::{
16
    grpc_types::types::Operation as GrpcOperation,
17
    log::error,
18
    types::{Field, FieldDefinition, FieldType, IndexDefinition, Operation, Record, Schema},
19
};
20
use futures_util::{
21
    future::{select, Either},
22
    Future,
23
};
24
use tokio::{runtime::Runtime, sync::broadcast::Sender};
25

26
#[allow(clippy::too_many_arguments)]
27
pub async fn create_cache(
9✔
28
    cache_manager: &dyn RwCacheManager,
9✔
29
    schema: Schema,
9✔
30
    secondary_index_config: &SecondaryIndexConfig,
9✔
31
    runtime: Arc<Runtime>,
9✔
32
    cancel: impl Future<Output = ()> + Unpin,
9✔
33
    log_path: &Path,
9✔
34
    write_options: CacheWriteOptions,
9✔
35
    operations_sender: Option<(String, Sender<GrpcOperation>)>,
9✔
36
    multi_pb: Option<MultiProgress>,
9✔
37
) -> Result<(String, impl FnOnce() -> Result<(), CacheError>), CacheError> {
9✔
38
    // Create secondary indexes
39
    let secondary_indexes = generate_secondary_indexes(&schema.fields, secondary_index_config)?;
9✔
40
    // Create the cache.
41
    let cache = cache_manager.create_cache(schema.clone(), secondary_indexes, write_options)?;
9✔
42
    let name = cache.name().to_string();
9✔
43

44
    // Create log reader.
45
    let log_reader = LogReader::new(log_path, &name, 0, multi_pb).await?;
18✔
46

47
    // Spawn a task to write to cache.
48
    let task = move || {
9✔
49
        build_cache(
9✔
50
            cache,
9✔
51
            runtime,
9✔
52
            cancel,
9✔
53
            log_reader,
9✔
54
            schema,
9✔
55
            operations_sender,
9✔
56
        )
9✔
57
    };
9✔
58
    Ok((name, task))
9✔
59
}
9✔
60

61
fn build_cache(
9✔
62
    mut cache: Box<dyn RwCache>,
9✔
63
    runtime: Arc<Runtime>,
9✔
64
    mut cancel: impl Future<Output = ()> + Unpin,
9✔
65
    mut log_reader: LogReader,
9✔
66
    schema: Schema,
9✔
67
    operations_sender: Option<(String, Sender<GrpcOperation>)>,
9✔
68
) -> Result<(), CacheError> {
9✔
69
    while let Some(op) = runtime.block_on(next_op_with_cancel(&mut log_reader, cancel)) {
90✔
70
        cancel = op.1;
81✔
71
        match op.0 {
81✔
72
            ExecutorOperation::Op { op } => match op {
24✔
73
                Operation::Delete { mut old } => {
3✔
74
                    old.schema_id = schema.identifier;
3✔
75
                    if let Some(meta) = cache.delete(&old)? {
3✔
76
                        if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref()
3✔
77
                        {
3✔
78
                            let operation = types_helper::map_delete_operation(
3✔
79
                                endpoint_name.clone(),
3✔
80
                                CacheRecord::new(meta.id, meta.version, old),
3✔
81
                            );
3✔
82
                            send_and_log_error(operations_sender, operation);
3✔
83
                        }
3✔
84
                    }
×
85
                }
86
                Operation::Insert { mut new } => {
21✔
87
                    new.schema_id = schema.identifier;
21✔
88
                    let result = cache.insert(&new)?;
21✔
89

90
                    if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
21✔
91
                        send_upsert_result(
21✔
92
                            endpoint_name,
21✔
93
                            operations_sender,
21✔
94
                            result,
21✔
95
                            &schema,
21✔
96
                            None,
21✔
97
                            new,
21✔
98
                        );
21✔
99
                    }
21✔
100
                }
101
                Operation::Update { mut old, mut new } => {
×
102
                    old.schema_id = schema.identifier;
×
103
                    new.schema_id = schema.identifier;
×
104
                    let upsert_result = cache.update(&old, &new)?;
×
105

106
                    if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() {
×
107
                        send_upsert_result(
×
108
                            endpoint_name,
×
109
                            operations_sender,
×
110
                            upsert_result,
×
111
                            &schema,
×
112
                            Some(old),
×
113
                            new,
×
114
                        );
×
115
                    }
×
116
                }
117
            },
118
            ExecutorOperation::Commit { .. } => {
119
                cache.commit()?;
21✔
120
            }
121
            ExecutorOperation::SnapshottingDone {} => {
122
                cache.commit()?;
36✔
123
            }
124
            ExecutorOperation::Terminate => {
125
                break;
×
126
            }
127
        }
128
    }
129

130
    Ok(())
9✔
131
}
9✔
132

133
fn generate_secondary_indexes(
12✔
134
    field_definitions: &[FieldDefinition],
12✔
135
    config: &SecondaryIndexConfig,
12✔
136
) -> Result<Vec<IndexDefinition>, CacheError> {
12✔
137
    let mut result = vec![];
12✔
138

139
    // Create default indexes unless skipped.
140
    for (index, field) in field_definitions.iter().enumerate() {
28✔
141
        if config.skip_default.contains(&field.name) {
28✔
142
            continue;
×
143
        }
28✔
144

28✔
145
        match field.typ {
28✔
146
            // Create sorted inverted indexes for these fields
147
            FieldType::UInt
148
            | FieldType::U128
149
            | FieldType::Int
150
            | FieldType::I128
151
            | FieldType::Float
152
            | FieldType::Boolean
153
            | FieldType::Decimal
154
            | FieldType::Timestamp
155
            | FieldType::Date
156
            | FieldType::Point
157
            | FieldType::Duration => result.push(IndexDefinition::SortedInverted(vec![index])),
16✔
158

159
            // Create sorted inverted and full text indexes for string fields.
160
            FieldType::String => {
12✔
161
                result.push(IndexDefinition::SortedInverted(vec![index]));
12✔
162
                result.push(IndexDefinition::FullText(index));
12✔
163
            }
12✔
164

165
            // Skip creating indexes
166
            FieldType::Text | FieldType::Binary | FieldType::Bson => (),
×
167
        }
168
    }
169

170
    // Create requested indexes.
171
    fn field_index_from_field_name(
×
172
        fields: &[FieldDefinition],
×
173
        field_name: &str,
×
174
    ) -> Result<usize, IndexError> {
×
175
        fields
×
176
            .iter()
×
177
            .position(|field| field.name == field_name)
×
178
            .ok_or(IndexError::UnknownFieldName(field_name.to_string()))
×
179
    }
×
180
    for create in &config.create {
12✔
181
        if let Some(index) = &create.index {
×
182
            match index {
×
183
                SecondaryIndex::SortedInverted(SortedInverted { fields }) => {
×
184
                    let fields = fields
×
185
                        .iter()
×
186
                        .map(|field| field_index_from_field_name(field_definitions, field))
×
187
                        .collect::<Result<Vec<_>, _>>()?;
×
188
                    result.push(IndexDefinition::SortedInverted(fields));
×
189
                }
190
                SecondaryIndex::FullText(FullText { field }) => {
×
191
                    let field = field_index_from_field_name(field_definitions, field)?;
×
192
                    result.push(IndexDefinition::FullText(field));
×
193
                }
194
            }
195
        }
×
196
    }
197

198
    Ok(result)
12✔
199
}
12✔
200

201
async fn next_op_with_cancel<F: Future<Output = ()> + Unpin>(
90✔
202
    log_reader: &mut LogReader,
90✔
203
    cancel: F,
90✔
204
) -> Option<(ExecutorOperation, F)> {
90✔
205
    let next_op = Box::pin(log_reader.next_op());
90✔
206
    match select(next_op, cancel).await {
90✔
207
        Either::Left((op, cancel)) => Some((op, cancel)),
81✔
208
        Either::Right(_) => None,
9✔
209
    }
210
}
90✔
211

212
fn send_upsert_result(
28✔
213
    endpoint_name: &str,
28✔
214
    operations_sender: &Sender<GrpcOperation>,
28✔
215
    upsert_result: UpsertResult,
28✔
216
    schema: &Schema,
28✔
217
    old: Option<Record>,
28✔
218
    new: Record,
28✔
219
) {
28✔
220
    match upsert_result {
28✔
221
        UpsertResult::Inserted { meta } => {
28✔
222
            let op = types_helper::map_insert_operation(
28✔
223
                endpoint_name.to_string(),
28✔
224
                CacheRecord::new(meta.id, meta.version, new),
28✔
225
            );
28✔
226
            send_and_log_error(operations_sender, op);
28✔
227
        }
28✔
228
        UpsertResult::Updated { old_meta, new_meta } => {
×
229
            // If `old` is `None`, it means `Updated` comes from `Insert` operation.
×
230
            // In this case, we can't get the full old record, but the fields in the primary index must be the same with the new record.
×
231
            // So we create the old record with only the fields in the primary index, cloned from `new`.
×
232
            let old = old.unwrap_or_else(|| {
×
233
                let mut record = Record::new(new.schema_id, vec![Field::Null; new.values.len()]);
×
234
                for index in schema.primary_index.iter() {
×
235
                    record.values[*index] = new.values[*index].clone();
×
236
                }
×
237
                record
×
238
            });
×
239
            let op = types_helper::map_update_operation(
×
240
                endpoint_name.to_string(),
×
241
                CacheRecord::new(old_meta.id, old_meta.version, old),
×
242
                CacheRecord::new(new_meta.id, new_meta.version, new),
×
243
            );
×
244
            send_and_log_error(operations_sender, op);
×
245
        }
×
246
        UpsertResult::Ignored => {}
×
247
    }
248
}
28✔
249

250
fn send_and_log_error<T: Send + Sync + 'static>(sender: &Sender<T>, msg: T) {
251
    if let Err(e) = sender.send(msg) {
32✔
252
        error!("Failed to send broadcast message: {}", e);
×
253
    }
32✔
254
}
32✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc