• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4354627675

pending completion
4354627675

push

github

GitHub
chore: Use `LmdbMap` and `LmdbMultimap` instead of raw database in cache (#1156)

754 of 754 new or added lines in 15 files covered. (100.0%)

29895 of 39630 relevant lines covered (75.44%)

38604.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.09
/dozer-cache/src/cache/lmdb/cache/query/handler.rs
1
use std::cmp::Ordering;
2
use std::ops::Bound;
3

4
use super::intersection::intersection;
5
use crate::cache::expression::Skip;
6
use crate::cache::lmdb::cache::helper::lmdb_cmp;
7
use crate::cache::lmdb::cache::LmdbCacheCommon;
8
use crate::cache::RecordWithId;
9
use crate::cache::{
10
    expression::{Operator, QueryExpression, SortDirection},
11
    index,
12
    plan::{IndexScan, IndexScanKind, Plan, QueryPlanner, SortedInvertedRangeQuery},
13
};
14
use crate::errors::{CacheError, IndexError};
15
use dozer_storage::lmdb::Transaction;
16
use dozer_types::types::{Field, IndexDefinition, Schema};
17
use itertools::Either;
18

19
pub struct LmdbQueryHandler<'a, T: Transaction> {
20
    common: &'a LmdbCacheCommon,
21
    txn: &'a T,
22
    schema: &'a Schema,
23
    secondary_indexes: &'a [IndexDefinition],
24
    query: &'a QueryExpression,
25
}
26
impl<'a, T: Transaction> LmdbQueryHandler<'a, T> {
×
27
    pub fn new(
4,461✔
28
        common: &'a LmdbCacheCommon,
4,461✔
29
        txn: &'a T,
4,461✔
30
        schema: &'a Schema,
4,461✔
31
        secondary_indexes: &'a [IndexDefinition],
4,461✔
32
        query: &'a QueryExpression,
4,461✔
33
    ) -> Self {
4,461✔
34
        Self {
4,461✔
35
            common,
4,461✔
36
            txn,
4,461✔
37
            schema,
4,461✔
38
            secondary_indexes,
4,461✔
39
            query,
4,461✔
40
        }
4,461✔
41
    }
4,461✔
42

×
43
    pub fn count(&self) -> Result<usize, CacheError> {
2,223✔
44
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,223✔
45
        let execution = planner.plan()?;
2,223✔
46
        match execution {
2,222✔
47
            Plan::IndexScans(index_scans) => Ok(self.build_index_scan(index_scans)?.count()),
2,085✔
48
            Plan::SeqScan(_) => Ok(match self.query.skip {
136✔
49
                Skip::Skip(skip) => self
130✔
50
                    .common
130✔
51
                    .record_id_to_record
130✔
52
                    .count(self.txn)?
130✔
53
                    .saturating_sub(skip)
130✔
54
                    .min(self.query.limit.unwrap_or(usize::MAX)),
130✔
55
                Skip::After(_) => self.all_ids()?.count(),
6✔
56
            }),
×
57
            Plan::ReturnEmpty => Ok(0),
1✔
58
        }
×
59
    }
2,223✔
60

×
61
    pub fn query(&self) -> Result<Vec<RecordWithId>, CacheError> {
2,238✔
62
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,238✔
63
        let execution = planner.plan()?;
2,238✔
64
        match execution {
2,237✔
65
            Plan::IndexScans(index_scans) => {
2,088✔
66
                self.collect_records(self.build_index_scan(index_scans)?)
2,088✔
67
            }
×
68
            Plan::SeqScan(_seq_scan) => self.collect_records(self.all_ids()?),
148✔
69
            Plan::ReturnEmpty => Ok(vec![]),
1✔
70
        }
×
71
    }
2,238✔
72

×
73
    pub fn all_ids(
154✔
74
        &self,
154✔
75
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + '_, CacheError> {
154✔
76
        let all_ids = self
154✔
77
            .common
154✔
78
            .record_id_to_record
154✔
79
            .keys(self.txn)?
154✔
80
            .map(|result| {
58,198✔
81
                result
58,198✔
82
                    .map(|id| id.into_owned())
58,198✔
83
                    .map_err(CacheError::Storage)
58,198✔
84
            });
58,198✔
85
        Ok(skip(all_ids, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
154✔
86
    }
154✔
87

×
88
    fn build_index_scan(
4,173✔
89
        &self,
4,173✔
90
        index_scans: Vec<IndexScan>,
4,173✔
91
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + '_, CacheError> {
4,173✔
92
        debug_assert!(
×
93
            !index_scans.is_empty(),
4,173✔
94
            "Planner should not generate empty index scan"
×
95
        );
×
96
        let full_scan = if index_scans.len() == 1 {
4,173✔
97
            // The fast path, without intersection calculation.
×
98
            Either::Left(self.query_with_secondary_index(&index_scans[0])?)
4,171✔
99
        } else {
×
100
            // Intersection of multiple index scans.
×
101
            let iterators = index_scans
2✔
102
                .iter()
2✔
103
                .map(|index_scan| self.query_with_secondary_index(index_scan))
4✔
104
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
105
            Either::Right(intersection(
2✔
106
                iterators,
2✔
107
                self.common.cache_options.intersection_chunk_size,
2✔
108
            ))
2✔
109
        };
×
110
        Ok(skip(full_scan, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
4,173✔
111
    }
4,173✔
112

×
113
    fn query_with_secondary_index(
4,175✔
114
        &'a self,
4,175✔
115
        index_scan: &IndexScan,
4,175✔
116
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + 'a, CacheError> {
4,175✔
117
        let schema_id = self
4,175✔
118
            .schema
4,175✔
119
            .identifier
4,175✔
120
            .ok_or(CacheError::SchemaHasNoIdentifier)?;
4,175✔
121
        let index_db = *self
4,175✔
122
            .common
4,175✔
123
            .secondary_indexes
4,175✔
124
            .get(&(schema_id, index_scan.index_id))
4,175✔
125
            .ok_or(CacheError::SecondaryIndexDatabaseNotFound)?;
4,175✔
126

127
        let RangeSpec {
×
128
            start,
4,175✔
129
            end,
4,175✔
130
            direction,
4,175✔
131
        } = get_range_spec(&index_scan.kind, index_scan.is_single_field_sorted_inverted)?;
4,175✔
132
        let start = match &start {
4,175✔
133
            Some(KeyEndpoint::Including(key)) => Bound::Included(key.as_slice()),
1,657✔
134
            Some(KeyEndpoint::Excluding(key)) => Bound::Excluded(key.as_slice()),
1,908✔
135
            None => Bound::Unbounded,
610✔
136
        };
137

138
        Ok(index_db
4,175✔
139
            .range(self.txn, start, direction == SortDirection::Ascending)?
4,175✔
140
            .take_while(move |result| match result {
2,319,352✔
141
                Ok((key, _)) => {
2,319,352✔
142
                    if let Some(end_key) = &end {
2,319,352✔
143
                        match lmdb_cmp(self.txn, index_db.database(), key, end_key.key()) {
1,873,836✔
144
                            Ordering::Less => matches!(direction, SortDirection::Ascending),
839,684✔
145
                            Ordering::Equal => matches!(end_key, KeyEndpoint::Including(_)),
359,209✔
146
                            Ordering::Greater => matches!(direction, SortDirection::Descending),
674,943✔
147
                        }
×
148
                    } else {
×
149
                        true
445,516✔
150
                    }
×
151
                }
×
152
                Err(_) => true,
×
153
            })
2,319,352✔
154
            .map(|result| {
2,318,033✔
155
                result
2,318,033✔
156
                    .map(|(_, id)| id.into_owned())
2,318,033✔
157
                    .map_err(CacheError::Storage)
2,318,033✔
158
            }))
2,318,033✔
159
    }
4,175✔
160

161
    fn collect_records(
2,236✔
162
        &self,
2,236✔
163
        ids: impl Iterator<Item = Result<u64, CacheError>>,
2,236✔
164
    ) -> Result<Vec<RecordWithId>, CacheError> {
2,236✔
165
        ids.filter_map(|id| match id {
1,126,597✔
166
            Ok(id) => self
1,126,597✔
167
                .common
1,126,597✔
168
                .record_id_to_record
1,126,597✔
169
                .get(self.txn, &id)
1,126,597✔
170
                .transpose()
1,126,597✔
171
                .map(|record| {
1,126,597✔
172
                    record
1,126,597✔
173
                        .map(|record| RecordWithId::new(id, record.into_owned()))
1,126,597✔
174
                        .map_err(CacheError::Storage)
1,126,597✔
175
                }),
1,126,597✔
176
            Err(err) => Some(Err(err)),
×
177
        })
1,126,597✔
178
        .collect()
2,236✔
179
    }
2,236✔
180
}
181

182
#[derive(Debug, Clone)]
×
183
pub enum KeyEndpoint {
×
184
    Including(Vec<u8>),
×
185
    Excluding(Vec<u8>),
×
186
}
×
187

×
188
impl KeyEndpoint {
×
189
    pub fn key(&self) -> &[u8] {
1,873,836✔
190
        match self {
1,873,836✔
191
            KeyEndpoint::Including(key) => key,
885,522✔
192
            KeyEndpoint::Excluding(key) => key,
988,314✔
193
        }
×
194
    }
1,873,836✔
195
}
×
196

×
197
#[derive(Debug)]
×
198
struct RangeSpec {
×
199
    start: Option<KeyEndpoint>,
×
200
    end: Option<KeyEndpoint>,
×
201
    direction: SortDirection,
×
202
}
×
203

×
204
fn get_range_spec(
4,175✔
205
    index_scan_kind: &IndexScanKind,
4,175✔
206
    is_single_field_sorted_inverted: bool,
4,175✔
207
) -> Result<RangeSpec, CacheError> {
4,175✔
208
    match &index_scan_kind {
4,175✔
209
        IndexScanKind::SortedInverted {
210
            eq_filters,
4,145✔
211
            range_query,
4,145✔
212
        } => {
4,145✔
213
            let comparison_key = build_sorted_inverted_comparision_key(
4,145✔
214
                eq_filters,
4,145✔
215
                range_query.as_ref(),
4,145✔
216
                is_single_field_sorted_inverted,
4,145✔
217
            );
4,145✔
218
            // There're 3 cases:
×
219
            // 1. Range query with operator.
×
220
            // 2. Range query without operator (only order by).
×
221
            // 3. No range query.
×
222
            Ok(if let Some(range_query) = range_query {
4,145✔
223
                match range_query.operator_and_value {
3,624✔
224
                    Some((operator, _)) => {
2,228✔
225
                        // Here we respond to case 1, examples are `a = 1 && b > 2` or `b < 2`.
2,228✔
226
                        let comparison_key = comparison_key.expect("here's at least a range query");
2,228✔
227
                        let null_key = build_sorted_inverted_comparision_key(
2,228✔
228
                            eq_filters,
2,228✔
229
                            Some(&SortedInvertedRangeQuery {
2,228✔
230
                                field_index: range_query.field_index,
2,228✔
231
                                operator_and_value: Some((operator, Field::Null)),
2,228✔
232
                                sort_direction: range_query.sort_direction,
2,228✔
233
                            }),
2,228✔
234
                            is_single_field_sorted_inverted,
2,228✔
235
                        )
2,228✔
236
                        .expect("we provided a range query");
2,228✔
237
                        get_key_interval_from_range_query(
2,228✔
238
                            comparison_key,
2,228✔
239
                            null_key,
2,228✔
240
                            operator,
2,228✔
241
                            range_query.sort_direction,
2,228✔
242
                        )
2,228✔
243
                    }
244
                    None => {
×
245
                        // Here we respond to case 2, examples are `a = 1 && b asc` or `b desc`.
×
246
                        if let Some(comparison_key) = comparison_key {
1,396✔
247
                            // This is the case like `a = 1 && b asc`. The comparison key is only built from `a = 1`.
×
248
                            // We use `a = 1 && b = null` as a sentinel, using the invariant that `null` is greater than anything.
×
249
                            let null_key = build_sorted_inverted_comparision_key(
962✔
250
                                eq_filters,
962✔
251
                                Some(&SortedInvertedRangeQuery {
962✔
252
                                    field_index: range_query.field_index,
962✔
253
                                    operator_and_value: Some((Operator::LT, Field::Null)),
962✔
254
                                    sort_direction: range_query.sort_direction,
962✔
255
                                }),
962✔
256
                                is_single_field_sorted_inverted,
962✔
257
                            )
962✔
258
                            .expect("we provided a range query");
962✔
259
                            match range_query.sort_direction {
962✔
260
                                SortDirection::Ascending => RangeSpec {
578✔
261
                                    start: Some(KeyEndpoint::Excluding(comparison_key)),
578✔
262
                                    end: Some(KeyEndpoint::Including(null_key)),
578✔
263
                                    direction: SortDirection::Ascending,
578✔
264
                                },
578✔
265
                                SortDirection::Descending => RangeSpec {
384✔
266
                                    start: Some(KeyEndpoint::Including(null_key)),
384✔
267
                                    end: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
268
                                    direction: SortDirection::Descending,
384✔
269
                                },
384✔
270
                            }
×
271
                        } else {
272
                            // Just all of them.
273
                            RangeSpec {
434✔
274
                                start: None,
434✔
275
                                end: None,
434✔
276
                                direction: range_query.sort_direction,
434✔
277
                            }
434✔
278
                        }
×
279
                    }
×
280
                }
×
281
            } else {
×
282
                // Here we respond to case 3, examples are `a = 1` or `a = 1 && b = 2`.
×
283
                let comparison_key = comparison_key
521✔
284
                    .expect("here's at least a eq filter because there's no range query");
521✔
285
                RangeSpec {
521✔
286
                    start: Some(KeyEndpoint::Including(comparison_key.clone())),
521✔
287
                    end: Some(KeyEndpoint::Including(comparison_key)),
521✔
288
                    direction: SortDirection::Ascending, // doesn't matter
521✔
289
                }
521✔
290
            })
×
291
        }
292
        IndexScanKind::FullText { filter } => match filter.op {
30✔
293
            Operator::Contains => {
294
                let token = match &filter.val {
30✔
295
                    Field::String(token) => token,
28✔
296
                    Field::Text(token) => token,
2✔
297
                    _ => return Err(CacheError::Index(IndexError::ExpectedStringFullText)),
×
298
                };
×
299
                let key = index::get_full_text_secondary_index(token);
30✔
300
                Ok(RangeSpec {
30✔
301
                    start: Some(KeyEndpoint::Including(key.clone())),
30✔
302
                    end: Some(KeyEndpoint::Including(key)),
30✔
303
                    direction: SortDirection::Ascending, // doesn't matter
30✔
304
                })
30✔
305
            }
×
306
            Operator::MatchesAll | Operator::MatchesAny => {
×
307
                unimplemented!("matches all and matches any are not implemented")
×
308
            }
×
309
            other => panic!("operator {other:?} is not supported by full text index"),
×
310
        },
×
311
    }
×
312
}
4,175✔
313

×
314
fn build_sorted_inverted_comparision_key(
7,335✔
315
    eq_filters: &[(usize, Field)],
7,335✔
316
    range_query: Option<&SortedInvertedRangeQuery>,
7,335✔
317
    is_single_field_index: bool,
7,335✔
318
) -> Option<Vec<u8>> {
7,335✔
319
    let mut fields = vec![];
7,335✔
320
    eq_filters.iter().for_each(|filter| {
7,335✔
321
        fields.push(&filter.1);
5,761✔
322
    });
7,335✔
323
    if let Some(range_query) = range_query {
7,335✔
324
        if let Some((_, val)) = &range_query.operator_and_value {
6,814✔
325
            fields.push(val);
5,418✔
326
        }
5,418✔
327
    }
521✔
328
    if fields.is_empty() {
7,335✔
329
        None
434✔
330
    } else {
×
331
        Some(index::get_secondary_index(&fields, is_single_field_index))
6,901✔
332
    }
×
333
}
7,335✔
334

×
335
/// Here we use the invariant that `null` is greater than anything.
×
336
fn get_key_interval_from_range_query(
2,228✔
337
    comparison_key: Vec<u8>,
2,228✔
338
    null_key: Vec<u8>,
2,228✔
339
    operator: Operator,
2,228✔
340
    sort_direction: SortDirection,
2,228✔
341
) -> RangeSpec {
2,228✔
342
    match (operator, sort_direction) {
2,228✔
343
        (Operator::LT, SortDirection::Ascending) => RangeSpec {
148✔
344
            start: None,
148✔
345
            end: Some(KeyEndpoint::Excluding(comparison_key)),
148✔
346
            direction: SortDirection::Ascending,
148✔
347
        },
148✔
348
        (Operator::LT, SortDirection::Descending) => RangeSpec {
384✔
349
            start: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
350
            end: None,
384✔
351
            direction: SortDirection::Descending,
384✔
352
        },
384✔
353
        (Operator::LTE, SortDirection::Ascending) => RangeSpec {
28✔
354
            start: None,
28✔
355
            end: Some(KeyEndpoint::Including(comparison_key)),
28✔
356
            direction: SortDirection::Ascending,
28✔
357
        },
28✔
358
        (Operator::LTE, SortDirection::Descending) => RangeSpec {
×
359
            start: Some(KeyEndpoint::Including(comparison_key)),
×
360
            end: None,
×
361
            direction: SortDirection::Descending,
×
362
        },
×
363
        (Operator::GT, SortDirection::Ascending) => RangeSpec {
176✔
364
            start: Some(KeyEndpoint::Excluding(comparison_key)),
176✔
365
            end: Some(KeyEndpoint::Excluding(null_key)),
176✔
366
            direction: SortDirection::Ascending,
176✔
367
        },
176✔
368
        (Operator::GT, SortDirection::Descending) => RangeSpec {
386✔
369
            start: Some(KeyEndpoint::Excluding(null_key)),
386✔
370
            end: Some(KeyEndpoint::Excluding(comparison_key)),
386✔
371
            direction: SortDirection::Descending,
386✔
372
        },
386✔
373
        (Operator::GTE, SortDirection::Ascending) => RangeSpec {
722✔
374
            start: Some(KeyEndpoint::Including(comparison_key)),
722✔
375
            end: Some(KeyEndpoint::Excluding(null_key)),
722✔
376
            direction: SortDirection::Ascending,
722✔
377
        },
722✔
378
        (Operator::GTE, SortDirection::Descending) => RangeSpec {
384✔
379
            start: Some(KeyEndpoint::Excluding(null_key)),
384✔
380
            end: Some(KeyEndpoint::Including(comparison_key)),
384✔
381
            direction: SortDirection::Descending,
384✔
382
        },
384✔
383
        (other, _) => {
×
384
            panic!("operator {other:?} is not supported by sorted inverted index range query")
×
385
        }
386
    }
387
}
2,228✔
388

389
fn skip(
4,327✔
390
    iter: impl Iterator<Item = Result<u64, CacheError>>,
4,327✔
391
    skip: Skip,
4,327✔
392
) -> impl Iterator<Item = Result<u64, CacheError>> {
4,327✔
393
    match skip {
4,327✔
394
        Skip::Skip(n) => Either::Left(iter.skip(n)),
3,775✔
395
        Skip::After(after) => Either::Right(skip_after(iter, after)),
552✔
396
    }
397
}
4,327✔
398

399
struct SkipAfter<T: Iterator<Item = Result<u64, CacheError>>> {
400
    inner: T,
401
    after: Option<u64>,
402
}
403

404
impl<T: Iterator<Item = Result<u64, CacheError>>> Iterator for SkipAfter<T> {
405
    type Item = Result<u64, CacheError>;
406

407
    fn next(&mut self) -> Option<Self::Item> {
178,488✔
408
        loop {
409
            if let Some(after) = self.after {
334,044✔
410
                match self.inner.next() {
155,628✔
411
                    Some(Ok(id)) => {
155,556✔
412
                        if id == after {
155,556✔
413
                            self.after = None;
480✔
414
                        }
155,076✔
415
                    }
416
                    Some(Err(e)) => return Some(Err(e)),
×
417
                    None => return None,
72✔
418
                }
419
            } else {
420
                return self.inner.next();
178,416✔
421
            }
422
        }
423
    }
178,488✔
424
}
425

426
fn skip_after<T: Iterator<Item = Result<u64, CacheError>>>(iter: T, after: u64) -> SkipAfter<T> {
552✔
427
    SkipAfter {
552✔
428
        inner: iter,
552✔
429
        after: Some(after),
552✔
430
    }
552✔
431
}
552✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc