• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4370280743

pending completion
4370280743

push

github

GitHub
Bump async-trait from 0.1.65 to 0.1.66 (#1179)

27808 of 38702 relevant lines covered (71.85%)

25323.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.15
/dozer-cache/src/cache/lmdb/cache/query/handler.rs
1
use std::cmp::Ordering;
2
use std::ops::Bound;
3

4
use super::intersection::intersection;
5
use crate::cache::expression::Skip;
6
use crate::cache::lmdb::cache::helper::lmdb_cmp;
7
use crate::cache::lmdb::cache::LmdbCacheCommon;
8
use crate::cache::RecordWithId;
9
use crate::cache::{
10
    expression::{Operator, QueryExpression, SortDirection},
11
    index,
12
    plan::{IndexScan, IndexScanKind, Plan, QueryPlanner, SortedInvertedRangeQuery},
13
};
14
use crate::errors::{CacheError, IndexError};
15
use dozer_storage::lmdb::Transaction;
16
use dozer_types::types::{Field, IndexDefinition, Schema};
17
use itertools::Either;
18

19
pub struct LmdbQueryHandler<'a, T: Transaction> {
20
    common: &'a LmdbCacheCommon,
21
    txn: &'a T,
22
    schema: &'a Schema,
23
    secondary_indexes: &'a [IndexDefinition],
24
    query: &'a QueryExpression,
25
}
26
impl<'a, T: Transaction> LmdbQueryHandler<'a, T> {
27
    pub fn new(
4,461✔
28
        common: &'a LmdbCacheCommon,
4,461✔
29
        txn: &'a T,
4,461✔
30
        schema: &'a Schema,
4,461✔
31
        secondary_indexes: &'a [IndexDefinition],
4,461✔
32
        query: &'a QueryExpression,
4,461✔
33
    ) -> Self {
4,461✔
34
        Self {
4,461✔
35
            common,
4,461✔
36
            txn,
4,461✔
37
            schema,
4,461✔
38
            secondary_indexes,
4,461✔
39
            query,
4,461✔
40
        }
4,461✔
41
    }
4,461✔
42

43
    pub fn count(&self) -> Result<usize, CacheError> {
2,223✔
44
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,223✔
45
        let execution = planner.plan()?;
2,223✔
46
        match execution {
2,222✔
47
            Plan::IndexScans(index_scans) => Ok(self.build_index_scan(index_scans)?.count()),
2,085✔
48
            Plan::SeqScan(_) => Ok(match self.query.skip {
136✔
49
                Skip::Skip(skip) => self
130✔
50
                    .common
130✔
51
                    .main_environment
130✔
52
                    .present_operation_ids()
130✔
53
                    .count(self.txn)?
130✔
54
                    .saturating_sub(skip)
130✔
55
                    .min(self.query.limit.unwrap_or(usize::MAX)),
130✔
56
                Skip::After(_) => self.all_ids()?.count(),
6✔
57
            }),
×
58
            Plan::ReturnEmpty => Ok(0),
1✔
59
        }
×
60
    }
2,223✔
61

×
62
    pub fn query(&self) -> Result<Vec<RecordWithId>, CacheError> {
2,238✔
63
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,238✔
64
        let execution = planner.plan()?;
2,238✔
65
        match execution {
2,237✔
66
            Plan::IndexScans(index_scans) => {
2,088✔
67
                self.collect_records(self.build_index_scan(index_scans)?)
2,088✔
68
            }
×
69
            Plan::SeqScan(_seq_scan) => self.collect_records(self.all_ids()?),
148✔
70
            Plan::ReturnEmpty => Ok(vec![]),
1✔
71
        }
×
72
    }
2,238✔
73

×
74
    pub fn all_ids(
154✔
75
        &self,
154✔
76
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + '_, CacheError> {
154✔
77
        let all_ids = self
154✔
78
            .common
154✔
79
            .main_environment
154✔
80
            .present_operation_ids()
154✔
81
            .iter(self.txn)?
154✔
82
            .map(|result| {
58,198✔
83
                result
58,198✔
84
                    .map(|id| id.into_owned())
58,198✔
85
                    .map_err(CacheError::Storage)
58,198✔
86
            });
58,198✔
87
        Ok(skip(all_ids, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
154✔
88
    }
154✔
89

×
90
    fn build_index_scan(
4,173✔
91
        &self,
4,173✔
92
        index_scans: Vec<IndexScan>,
4,173✔
93
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + '_, CacheError> {
4,173✔
94
        debug_assert!(
×
95
            !index_scans.is_empty(),
4,173✔
96
            "Planner should not generate empty index scan"
×
97
        );
98
        let full_scan = if index_scans.len() == 1 {
4,173✔
99
            // The fast path, without intersection calculation.
100
            Either::Left(self.query_with_secondary_index(&index_scans[0])?)
4,171✔
101
        } else {
×
102
            // Intersection of multiple index scans.
×
103
            let iterators = index_scans
2✔
104
                .iter()
2✔
105
                .map(|index_scan| self.query_with_secondary_index(index_scan))
4✔
106
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
107
            Either::Right(intersection(
2✔
108
                iterators,
2✔
109
                self.common.cache_options.intersection_chunk_size,
2✔
110
            ))
2✔
111
        };
×
112
        Ok(skip(full_scan, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
4,173✔
113
    }
4,173✔
114

×
115
    fn query_with_secondary_index(
4,175✔
116
        &'a self,
4,175✔
117
        index_scan: &IndexScan,
4,175✔
118
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + 'a, CacheError> {
4,175✔
119
        let index_db = self.common.secondary_indexes[index_scan.index_id];
4,175✔
120

×
121
        let RangeSpec {
×
122
            start,
4,175✔
123
            end,
4,175✔
124
            direction,
4,175✔
125
        } = get_range_spec(&index_scan.kind, index_scan.is_single_field_sorted_inverted)?;
4,175✔
126
        let start = match &start {
4,175✔
127
            Some(KeyEndpoint::Including(key)) => Bound::Included(key.as_slice()),
1,657✔
128
            Some(KeyEndpoint::Excluding(key)) => Bound::Excluded(key.as_slice()),
1,908✔
129
            None => Bound::Unbounded,
610✔
130
        };
×
131

×
132
        Ok(index_db
4,175✔
133
            .range(self.txn, start, direction == SortDirection::Ascending)?
4,175✔
134
            .take_while(move |result| match result {
2,319,352✔
135
                Ok((key, _)) => {
2,319,352✔
136
                    if let Some(end_key) = &end {
2,319,352✔
137
                        match lmdb_cmp(self.txn, index_db.database(), key.borrow(), end_key.key()) {
1,873,836✔
138
                            Ordering::Less => matches!(direction, SortDirection::Ascending),
839,684✔
139
                            Ordering::Equal => matches!(end_key, KeyEndpoint::Including(_)),
359,209✔
140
                            Ordering::Greater => matches!(direction, SortDirection::Descending),
674,943✔
141
                        }
×
142
                    } else {
×
143
                        true
445,516✔
144
                    }
×
145
                }
×
146
                Err(_) => true,
×
147
            })
2,319,352✔
148
            .map(|result| {
2,318,033✔
149
                result
2,318,033✔
150
                    .map(|(_, id)| id.into_owned())
2,318,033✔
151
                    .map_err(CacheError::Storage)
2,318,033✔
152
            }))
2,318,033✔
153
    }
4,175✔
154

×
155
    fn collect_records(
2,236✔
156
        &self,
2,236✔
157
        ids: impl Iterator<Item = Result<u64, CacheError>>,
2,236✔
158
    ) -> Result<Vec<RecordWithId>, CacheError> {
2,236✔
159
        ids.filter_map(|id| match id {
1,126,597✔
160
            Ok(id) => self
1,126,597✔
161
                .common
1,126,597✔
162
                .main_environment
1,126,597✔
163
                .get_by_operation_id(self.txn, id)
1,126,597✔
164
                .transpose(),
1,126,597✔
165
            Err(err) => Some(Err(err)),
×
166
        })
1,126,597✔
167
        .collect()
2,236✔
168
    }
2,236✔
169
}
×
170

×
171
#[derive(Debug, Clone)]
×
172
pub enum KeyEndpoint {
×
173
    Including(Vec<u8>),
×
174
    Excluding(Vec<u8>),
×
175
}
×
176

×
177
impl KeyEndpoint {
×
178
    pub fn key(&self) -> &[u8] {
1,873,836✔
179
        match self {
1,873,836✔
180
            KeyEndpoint::Including(key) => key,
885,522✔
181
            KeyEndpoint::Excluding(key) => key,
988,314✔
182
        }
×
183
    }
1,873,836✔
184
}
185

186
#[derive(Debug)]
×
187
struct RangeSpec {
188
    start: Option<KeyEndpoint>,
189
    end: Option<KeyEndpoint>,
×
190
    direction: SortDirection,
×
191
}
×
192

×
193
fn get_range_spec(
4,175✔
194
    index_scan_kind: &IndexScanKind,
4,175✔
195
    is_single_field_sorted_inverted: bool,
4,175✔
196
) -> Result<RangeSpec, CacheError> {
4,175✔
197
    match &index_scan_kind {
4,175✔
198
        IndexScanKind::SortedInverted {
199
            eq_filters,
4,145✔
200
            range_query,
4,145✔
201
        } => {
4,145✔
202
            let comparison_key = build_sorted_inverted_comparision_key(
4,145✔
203
                eq_filters,
4,145✔
204
                range_query.as_ref(),
4,145✔
205
                is_single_field_sorted_inverted,
4,145✔
206
            );
4,145✔
207
            // There're 3 cases:
×
208
            // 1. Range query with operator.
×
209
            // 2. Range query without operator (only order by).
210
            // 3. No range query.
×
211
            Ok(if let Some(range_query) = range_query {
4,145✔
212
                match range_query.operator_and_value {
3,624✔
213
                    Some((operator, _)) => {
2,228✔
214
                        // Here we respond to case 1, examples are `a = 1 && b > 2` or `b < 2`.
2,228✔
215
                        let comparison_key = comparison_key.expect("here's at least a range query");
2,228✔
216
                        let null_key = build_sorted_inverted_comparision_key(
2,228✔
217
                            eq_filters,
2,228✔
218
                            Some(&SortedInvertedRangeQuery {
2,228✔
219
                                field_index: range_query.field_index,
2,228✔
220
                                operator_and_value: Some((operator, Field::Null)),
2,228✔
221
                                sort_direction: range_query.sort_direction,
2,228✔
222
                            }),
2,228✔
223
                            is_single_field_sorted_inverted,
2,228✔
224
                        )
2,228✔
225
                        .expect("we provided a range query");
2,228✔
226
                        get_key_interval_from_range_query(
2,228✔
227
                            comparison_key,
2,228✔
228
                            null_key,
2,228✔
229
                            operator,
2,228✔
230
                            range_query.sort_direction,
2,228✔
231
                        )
2,228✔
232
                    }
×
233
                    None => {
×
234
                        // Here we respond to case 2, examples are `a = 1 && b asc` or `b desc`.
×
235
                        if let Some(comparison_key) = comparison_key {
1,396✔
236
                            // This is the case like `a = 1 && b asc`. The comparison key is only built from `a = 1`.
×
237
                            // We use `a = 1 && b = null` as a sentinel, using the invariant that `null` is greater than anything.
×
238
                            let null_key = build_sorted_inverted_comparision_key(
962✔
239
                                eq_filters,
962✔
240
                                Some(&SortedInvertedRangeQuery {
962✔
241
                                    field_index: range_query.field_index,
962✔
242
                                    operator_and_value: Some((Operator::LT, Field::Null)),
962✔
243
                                    sort_direction: range_query.sort_direction,
962✔
244
                                }),
962✔
245
                                is_single_field_sorted_inverted,
962✔
246
                            )
962✔
247
                            .expect("we provided a range query");
962✔
248
                            match range_query.sort_direction {
962✔
249
                                SortDirection::Ascending => RangeSpec {
578✔
250
                                    start: Some(KeyEndpoint::Excluding(comparison_key)),
578✔
251
                                    end: Some(KeyEndpoint::Including(null_key)),
578✔
252
                                    direction: SortDirection::Ascending,
578✔
253
                                },
578✔
254
                                SortDirection::Descending => RangeSpec {
384✔
255
                                    start: Some(KeyEndpoint::Including(null_key)),
384✔
256
                                    end: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
257
                                    direction: SortDirection::Descending,
384✔
258
                                },
384✔
259
                            }
×
260
                        } else {
×
261
                            // Just all of them.
×
262
                            RangeSpec {
434✔
263
                                start: None,
434✔
264
                                end: None,
434✔
265
                                direction: range_query.sort_direction,
434✔
266
                            }
434✔
267
                        }
×
268
                    }
×
269
                }
×
270
            } else {
271
                // Here we respond to case 3, examples are `a = 1` or `a = 1 && b = 2`.
272
                let comparison_key = comparison_key
521✔
273
                    .expect("here's at least a eq filter because there's no range query");
521✔
274
                RangeSpec {
521✔
275
                    start: Some(KeyEndpoint::Including(comparison_key.clone())),
521✔
276
                    end: Some(KeyEndpoint::Including(comparison_key)),
521✔
277
                    direction: SortDirection::Ascending, // doesn't matter
521✔
278
                }
521✔
279
            })
280
        }
281
        IndexScanKind::FullText { filter } => match filter.op {
30✔
282
            Operator::Contains => {
283
                let token = match &filter.val {
30✔
284
                    Field::String(token) => token,
28✔
285
                    Field::Text(token) => token,
2✔
286
                    _ => return Err(CacheError::Index(IndexError::ExpectedStringFullText)),
×
287
                };
×
288
                let key = index::get_full_text_secondary_index(token);
30✔
289
                Ok(RangeSpec {
30✔
290
                    start: Some(KeyEndpoint::Including(key.clone())),
30✔
291
                    end: Some(KeyEndpoint::Including(key)),
30✔
292
                    direction: SortDirection::Ascending, // doesn't matter
30✔
293
                })
30✔
294
            }
×
295
            Operator::MatchesAll | Operator::MatchesAny => {
×
296
                unimplemented!("matches all and matches any are not implemented")
×
297
            }
×
298
            other => panic!("operator {other:?} is not supported by full text index"),
×
299
        },
×
300
    }
×
301
}
4,175✔
302

×
303
fn build_sorted_inverted_comparision_key(
7,335✔
304
    eq_filters: &[(usize, Field)],
7,335✔
305
    range_query: Option<&SortedInvertedRangeQuery>,
7,335✔
306
    is_single_field_index: bool,
7,335✔
307
) -> Option<Vec<u8>> {
7,335✔
308
    let mut fields = vec![];
7,335✔
309
    eq_filters.iter().for_each(|filter| {
7,335✔
310
        fields.push(&filter.1);
5,761✔
311
    });
7,335✔
312
    if let Some(range_query) = range_query {
7,335✔
313
        if let Some((_, val)) = &range_query.operator_and_value {
6,814✔
314
            fields.push(val);
5,418✔
315
        }
5,418✔
316
    }
521✔
317
    if fields.is_empty() {
7,335✔
318
        None
434✔
319
    } else {
×
320
        Some(index::get_secondary_index(&fields, is_single_field_index))
6,901✔
321
    }
×
322
}
7,335✔
323

×
324
/// Here we use the invariant that `null` is greater than anything.
×
325
fn get_key_interval_from_range_query(
2,228✔
326
    comparison_key: Vec<u8>,
2,228✔
327
    null_key: Vec<u8>,
2,228✔
328
    operator: Operator,
2,228✔
329
    sort_direction: SortDirection,
2,228✔
330
) -> RangeSpec {
2,228✔
331
    match (operator, sort_direction) {
2,228✔
332
        (Operator::LT, SortDirection::Ascending) => RangeSpec {
148✔
333
            start: None,
148✔
334
            end: Some(KeyEndpoint::Excluding(comparison_key)),
148✔
335
            direction: SortDirection::Ascending,
148✔
336
        },
148✔
337
        (Operator::LT, SortDirection::Descending) => RangeSpec {
384✔
338
            start: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
339
            end: None,
384✔
340
            direction: SortDirection::Descending,
384✔
341
        },
384✔
342
        (Operator::LTE, SortDirection::Ascending) => RangeSpec {
28✔
343
            start: None,
28✔
344
            end: Some(KeyEndpoint::Including(comparison_key)),
28✔
345
            direction: SortDirection::Ascending,
28✔
346
        },
28✔
347
        (Operator::LTE, SortDirection::Descending) => RangeSpec {
×
348
            start: Some(KeyEndpoint::Including(comparison_key)),
×
349
            end: None,
×
350
            direction: SortDirection::Descending,
×
351
        },
×
352
        (Operator::GT, SortDirection::Ascending) => RangeSpec {
176✔
353
            start: Some(KeyEndpoint::Excluding(comparison_key)),
176✔
354
            end: Some(KeyEndpoint::Excluding(null_key)),
176✔
355
            direction: SortDirection::Ascending,
176✔
356
        },
176✔
357
        (Operator::GT, SortDirection::Descending) => RangeSpec {
386✔
358
            start: Some(KeyEndpoint::Excluding(null_key)),
386✔
359
            end: Some(KeyEndpoint::Excluding(comparison_key)),
386✔
360
            direction: SortDirection::Descending,
386✔
361
        },
386✔
362
        (Operator::GTE, SortDirection::Ascending) => RangeSpec {
722✔
363
            start: Some(KeyEndpoint::Including(comparison_key)),
722✔
364
            end: Some(KeyEndpoint::Excluding(null_key)),
722✔
365
            direction: SortDirection::Ascending,
722✔
366
        },
722✔
367
        (Operator::GTE, SortDirection::Descending) => RangeSpec {
384✔
368
            start: Some(KeyEndpoint::Excluding(null_key)),
384✔
369
            end: Some(KeyEndpoint::Including(comparison_key)),
384✔
370
            direction: SortDirection::Descending,
384✔
371
        },
384✔
372
        (other, _) => {
×
373
            panic!("operator {other:?} is not supported by sorted inverted index range query")
×
374
        }
×
375
    }
×
376
}
2,228✔
377

×
378
fn skip(
4,327✔
379
    iter: impl Iterator<Item = Result<u64, CacheError>>,
4,327✔
380
    skip: Skip,
4,327✔
381
) -> impl Iterator<Item = Result<u64, CacheError>> {
4,327✔
382
    match skip {
4,327✔
383
        Skip::Skip(n) => Either::Left(iter.skip(n)),
3,775✔
384
        Skip::After(after) => Either::Right(skip_after(iter, after)),
552✔
385
    }
386
}
4,327✔
387

×
388
struct SkipAfter<T: Iterator<Item = Result<u64, CacheError>>> {
389
    inner: T,
×
390
    after: Option<u64>,
×
391
}
×
392

×
393
impl<T: Iterator<Item = Result<u64, CacheError>>> Iterator for SkipAfter<T> {
×
394
    type Item = Result<u64, CacheError>;
×
395

×
396
    fn next(&mut self) -> Option<Self::Item> {
178,488✔
397
        loop {
×
398
            if let Some(after) = self.after {
334,044✔
399
                match self.inner.next() {
155,628✔
400
                    Some(Ok(id)) => {
155,556✔
401
                        if id == after {
155,556✔
402
                            self.after = None;
480✔
403
                        }
155,076✔
404
                    }
405
                    Some(Err(e)) => return Some(Err(e)),
×
406
                    None => return None,
72✔
407
                }
×
408
            } else {
409
                return self.inner.next();
178,416✔
410
            }
×
411
        }
×
412
    }
178,488✔
413
}
×
414

×
415
fn skip_after<T: Iterator<Item = Result<u64, CacheError>>>(iter: T, after: u64) -> SkipAfter<T> {
552✔
416
    SkipAfter {
552✔
417
        inner: iter,
552✔
418
        after: Some(after),
552✔
419
    }
552✔
420
}
552✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc