• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4377467257

pending completion
4377467257

push

github

GitHub
implement `HAVING` (#1198)

395 of 395 new or added lines in 6 files covered. (100.0%)

27638 of 38584 relevant lines covered (71.63%)

27777.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.57
/dozer-cache/src/cache/lmdb/cache/query/handler.rs
1
use std::cmp::Ordering;
2
use std::ops::Bound;
3

4
use super::intersection::intersection;
5
use crate::cache::expression::Skip;
6
use crate::cache::lmdb::cache::helper::lmdb_cmp;
7
use crate::cache::lmdb::cache::LmdbCacheCommon;
8
use crate::cache::RecordWithId;
9
use crate::cache::{
10
    expression::{Operator, QueryExpression, SortDirection},
11
    index,
12
    plan::{IndexScan, IndexScanKind, Plan, QueryPlanner, SortedInvertedRangeQuery},
13
};
14
use crate::errors::{CacheError, IndexError};
15
use dozer_storage::lmdb::Transaction;
16
use dozer_types::borrow::{Borrow, IntoOwned};
17
use dozer_types::types::{Field, IndexDefinition, Schema};
18
use itertools::Either;
19

20
pub struct LmdbQueryHandler<'a, T: Transaction> {
21
    common: &'a LmdbCacheCommon,
22
    txn: &'a T,
23
    schema: &'a Schema,
24
    secondary_indexes: &'a [IndexDefinition],
25
    query: &'a QueryExpression,
26
}
27
impl<'a, T: Transaction> LmdbQueryHandler<'a, T> {
×
28
    pub fn new(
4,473✔
29
        common: &'a LmdbCacheCommon,
4,473✔
30
        txn: &'a T,
4,473✔
31
        schema: &'a Schema,
4,473✔
32
        secondary_indexes: &'a [IndexDefinition],
4,473✔
33
        query: &'a QueryExpression,
4,473✔
34
    ) -> Self {
4,473✔
35
        Self {
4,473✔
36
            common,
4,473✔
37
            txn,
4,473✔
38
            schema,
4,473✔
39
            secondary_indexes,
4,473✔
40
            query,
4,473✔
41
        }
4,473✔
42
    }
4,473✔
43

×
44
    pub fn count(&self) -> Result<usize, CacheError> {
2,229✔
45
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,229✔
46
        let execution = planner.plan()?;
2,229✔
47
        match execution {
2,228✔
48
            Plan::IndexScans(index_scans) => Ok(self.build_index_scan(index_scans)?.count()),
2,085✔
49
            Plan::SeqScan(_) => Ok(match self.query.skip {
142✔
50
                Skip::Skip(skip) => self
136✔
51
                    .common
136✔
52
                    .main_environment
136✔
53
                    .count(self.txn, self.schema.is_append_only())?
136✔
54
                    .saturating_sub(skip)
136✔
55
                    .min(self.query.limit.unwrap_or(usize::MAX)),
136✔
56
                Skip::After(_) => self.all_ids()?.count(),
6✔
57
            }),
×
58
            Plan::ReturnEmpty => Ok(0),
1✔
59
        }
×
60
    }
2,229✔
61

×
62
    pub fn query(&self) -> Result<Vec<RecordWithId>, CacheError> {
2,244✔
63
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
2,244✔
64
        let execution = planner.plan()?;
2,244✔
65
        match execution {
2,243✔
66
            Plan::IndexScans(index_scans) => {
2,088✔
67
                self.collect_records(self.build_index_scan(index_scans)?)
2,088✔
68
            }
×
69
            Plan::SeqScan(_seq_scan) => self.collect_records(self.all_ids()?),
154✔
70
            Plan::ReturnEmpty => Ok(vec![]),
1✔
71
        }
×
72
    }
2,244✔
73

×
74
    pub fn all_ids(
160✔
75
        &self,
160✔
76
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + '_, CacheError> {
160✔
77
        let all_ids = self
160✔
78
            .common
160✔
79
            .main_environment
160✔
80
            .present_operation_ids(self.txn, self.schema.is_append_only())?
160✔
81
            .map(|result| {
58,222✔
82
                result
58,222✔
83
                    .map(|id| id.into_owned())
58,222✔
84
                    .map_err(CacheError::Storage)
58,222✔
85
            });
58,222✔
86
        Ok(skip(all_ids, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
160✔
87
    }
160✔
88

×
89
    fn build_index_scan(
4,173✔
90
        &self,
4,173✔
91
        index_scans: Vec<IndexScan>,
4,173✔
92
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + '_, CacheError> {
4,173✔
93
        debug_assert!(
×
94
            !index_scans.is_empty(),
4,173✔
95
            "Planner should not generate empty index scan"
×
96
        );
×
97
        let full_scan = if index_scans.len() == 1 {
4,173✔
98
            // The fast path, without intersection calculation.
×
99
            Either::Left(self.query_with_secondary_index(&index_scans[0])?)
4,171✔
100
        } else {
×
101
            // Intersection of multiple index scans.
×
102
            let iterators = index_scans
2✔
103
                .iter()
2✔
104
                .map(|index_scan| self.query_with_secondary_index(index_scan))
4✔
105
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
106
            Either::Right(intersection(
2✔
107
                iterators,
2✔
108
                self.common.cache_options.intersection_chunk_size,
2✔
109
            ))
2✔
110
        };
×
111
        Ok(skip(full_scan, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
4,173✔
112
    }
4,173✔
113

×
114
    fn query_with_secondary_index(
4,175✔
115
        &'a self,
4,175✔
116
        index_scan: &IndexScan,
4,175✔
117
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + 'a, CacheError> {
4,175✔
118
        let index_db = self.common.secondary_indexes[index_scan.index_id];
4,175✔
119

×
120
        let RangeSpec {
×
121
            start,
4,175✔
122
            end,
4,175✔
123
            direction,
4,175✔
124
        } = get_range_spec(&index_scan.kind, index_scan.is_single_field_sorted_inverted)?;
4,175✔
125
        let start = match &start {
4,175✔
126
            Some(KeyEndpoint::Including(key)) => Bound::Included(key.as_slice()),
1,657✔
127
            Some(KeyEndpoint::Excluding(key)) => Bound::Excluded(key.as_slice()),
1,908✔
128
            None => Bound::Unbounded,
610✔
129
        };
×
130

×
131
        Ok(index_db
4,175✔
132
            .range(self.txn, start, direction == SortDirection::Ascending)?
4,175✔
133
            .take_while(move |result| match result {
2,319,352✔
134
                Ok((key, _)) => {
2,319,352✔
135
                    if let Some(end_key) = &end {
2,319,352✔
136
                        match lmdb_cmp(self.txn, index_db.database(), key.borrow(), end_key.key()) {
1,873,836✔
137
                            Ordering::Less => matches!(direction, SortDirection::Ascending),
839,684✔
138
                            Ordering::Equal => matches!(end_key, KeyEndpoint::Including(_)),
359,209✔
139
                            Ordering::Greater => matches!(direction, SortDirection::Descending),
674,943✔
140
                        }
×
141
                    } else {
×
142
                        true
445,516✔
143
                    }
×
144
                }
×
145
                Err(_) => true,
×
146
            })
2,319,352✔
147
            .map(|result| {
2,318,033✔
148
                result
2,318,033✔
149
                    .map(|(_, id)| id.into_owned())
2,318,033✔
150
                    .map_err(CacheError::Storage)
2,318,033✔
151
            }))
2,318,033✔
152
    }
4,175✔
153

×
154
    fn collect_records(
2,242✔
155
        &self,
2,242✔
156
        ids: impl Iterator<Item = Result<u64, CacheError>>,
2,242✔
157
    ) -> Result<Vec<RecordWithId>, CacheError> {
2,242✔
158
        ids.filter_map(|id| match id {
1,126,621✔
159
            Ok(id) => self
1,126,621✔
160
                .common
1,126,621✔
161
                .main_environment
1,126,621✔
162
                .get_by_operation_id(self.txn, id, self.schema.is_append_only())
1,126,621✔
163
                .transpose(),
1,126,621✔
164
            Err(err) => Some(Err(err)),
×
165
        })
1,126,621✔
166
        .collect()
2,242✔
167
    }
2,242✔
168
}
×
169

×
170
#[derive(Debug, Clone)]
×
171
pub enum KeyEndpoint {
×
172
    Including(Vec<u8>),
×
173
    Excluding(Vec<u8>),
×
174
}
×
175

×
176
impl KeyEndpoint {
×
177
    pub fn key(&self) -> &[u8] {
1,873,836✔
178
        match self {
1,873,836✔
179
            KeyEndpoint::Including(key) => key,
885,522✔
180
            KeyEndpoint::Excluding(key) => key,
988,314✔
181
        }
×
182
    }
1,873,836✔
183
}
×
184

185
#[derive(Debug)]
×
186
struct RangeSpec {
×
187
    start: Option<KeyEndpoint>,
188
    end: Option<KeyEndpoint>,
189
    direction: SortDirection,
×
190
}
×
191

×
192
fn get_range_spec(
4,175✔
193
    index_scan_kind: &IndexScanKind,
4,175✔
194
    is_single_field_sorted_inverted: bool,
4,175✔
195
) -> Result<RangeSpec, CacheError> {
4,175✔
196
    match &index_scan_kind {
4,175✔
197
        IndexScanKind::SortedInverted {
×
198
            eq_filters,
4,145✔
199
            range_query,
4,145✔
200
        } => {
4,145✔
201
            let comparison_key = build_sorted_inverted_comparison_key(
4,145✔
202
                eq_filters,
4,145✔
203
                range_query.as_ref(),
4,145✔
204
                is_single_field_sorted_inverted,
4,145✔
205
            );
4,145✔
206
            // There're 3 cases:
×
207
            // 1. Range query with operator.
×
208
            // 2. Range query without operator (only order by).
×
209
            // 3. No range query.
210
            Ok(if let Some(range_query) = range_query {
4,145✔
211
                match range_query.operator_and_value {
3,624✔
212
                    Some((operator, _)) => {
2,228✔
213
                        // Here we respond to case 1, examples are `a = 1 && b > 2` or `b < 2`.
2,228✔
214
                        let comparison_key = comparison_key.expect("here's at least a range query");
2,228✔
215
                        let null_key = build_sorted_inverted_comparison_key(
2,228✔
216
                            eq_filters,
2,228✔
217
                            Some(&SortedInvertedRangeQuery {
2,228✔
218
                                field_index: range_query.field_index,
2,228✔
219
                                operator_and_value: Some((operator, Field::Null)),
2,228✔
220
                                sort_direction: range_query.sort_direction,
2,228✔
221
                            }),
2,228✔
222
                            is_single_field_sorted_inverted,
2,228✔
223
                        )
2,228✔
224
                        .expect("we provided a range query");
2,228✔
225
                        get_key_interval_from_range_query(
2,228✔
226
                            comparison_key,
2,228✔
227
                            null_key,
2,228✔
228
                            operator,
2,228✔
229
                            range_query.sort_direction,
2,228✔
230
                        )
2,228✔
231
                    }
×
232
                    None => {
×
233
                        // Here we respond to case 2, examples are `a = 1 && b asc` or `b desc`.
×
234
                        if let Some(comparison_key) = comparison_key {
1,396✔
235
                            // This is the case like `a = 1 && b asc`. The comparison key is only built from `a = 1`.
×
236
                            // We use `a = 1 && b = null` as a sentinel, using the invariant that `null` is greater than anything.
×
237
                            let null_key = build_sorted_inverted_comparison_key(
962✔
238
                                eq_filters,
962✔
239
                                Some(&SortedInvertedRangeQuery {
962✔
240
                                    field_index: range_query.field_index,
962✔
241
                                    operator_and_value: Some((Operator::LT, Field::Null)),
962✔
242
                                    sort_direction: range_query.sort_direction,
962✔
243
                                }),
962✔
244
                                is_single_field_sorted_inverted,
962✔
245
                            )
962✔
246
                            .expect("we provided a range query");
962✔
247
                            match range_query.sort_direction {
962✔
248
                                SortDirection::Ascending => RangeSpec {
578✔
249
                                    start: Some(KeyEndpoint::Excluding(comparison_key)),
578✔
250
                                    end: Some(KeyEndpoint::Including(null_key)),
578✔
251
                                    direction: SortDirection::Ascending,
578✔
252
                                },
578✔
253
                                SortDirection::Descending => RangeSpec {
384✔
254
                                    start: Some(KeyEndpoint::Including(null_key)),
384✔
255
                                    end: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
256
                                    direction: SortDirection::Descending,
384✔
257
                                },
384✔
258
                            }
×
259
                        } else {
×
260
                            // Just all of them.
×
261
                            RangeSpec {
434✔
262
                                start: None,
434✔
263
                                end: None,
434✔
264
                                direction: range_query.sort_direction,
434✔
265
                            }
434✔
266
                        }
×
267
                    }
×
268
                }
×
269
            } else {
×
270
                // Here we respond to case 3, examples are `a = 1` or `a = 1 && b = 2`.
271
                let comparison_key = comparison_key
521✔
272
                    .expect("here's at least a eq filter because there's no range query");
521✔
273
                RangeSpec {
521✔
274
                    start: Some(KeyEndpoint::Including(comparison_key.clone())),
521✔
275
                    end: Some(KeyEndpoint::Including(comparison_key)),
521✔
276
                    direction: SortDirection::Ascending, // doesn't matter
521✔
277
                }
521✔
278
            })
×
279
        }
280
        IndexScanKind::FullText { filter } => match filter.op {
30✔
281
            Operator::Contains => {
×
282
                let token = match &filter.val {
30✔
283
                    Field::String(token) => token,
28✔
284
                    Field::Text(token) => token,
2✔
285
                    _ => return Err(CacheError::Index(IndexError::ExpectedStringFullText)),
×
286
                };
×
287
                let key = index::get_full_text_secondary_index(token);
30✔
288
                Ok(RangeSpec {
30✔
289
                    start: Some(KeyEndpoint::Including(key.clone())),
30✔
290
                    end: Some(KeyEndpoint::Including(key)),
30✔
291
                    direction: SortDirection::Ascending, // doesn't matter
30✔
292
                })
30✔
293
            }
×
294
            Operator::MatchesAll | Operator::MatchesAny => {
×
295
                unimplemented!("matches all and matches any are not implemented")
×
296
            }
×
297
            other => panic!("operator {other:?} is not supported by full text index"),
×
298
        },
×
299
    }
×
300
}
4,175✔
301

×
302
fn build_sorted_inverted_comparison_key(
7,335✔
303
    eq_filters: &[(usize, Field)],
7,335✔
304
    range_query: Option<&SortedInvertedRangeQuery>,
7,335✔
305
    is_single_field_index: bool,
7,335✔
306
) -> Option<Vec<u8>> {
7,335✔
307
    let mut fields = vec![];
7,335✔
308
    eq_filters.iter().for_each(|filter| {
7,335✔
309
        fields.push(&filter.1);
5,761✔
310
    });
7,335✔
311
    if let Some(range_query) = range_query {
7,335✔
312
        if let Some((_, val)) = &range_query.operator_and_value {
6,814✔
313
            fields.push(val);
5,418✔
314
        }
5,418✔
315
    }
521✔
316
    if fields.is_empty() {
7,335✔
317
        None
434✔
318
    } else {
×
319
        Some(index::get_secondary_index(&fields, is_single_field_index))
6,901✔
320
    }
×
321
}
7,335✔
322

×
323
/// Here we use the invariant that `null` is greater than anything.
×
324
fn get_key_interval_from_range_query(
2,228✔
325
    comparison_key: Vec<u8>,
2,228✔
326
    null_key: Vec<u8>,
2,228✔
327
    operator: Operator,
2,228✔
328
    sort_direction: SortDirection,
2,228✔
329
) -> RangeSpec {
2,228✔
330
    match (operator, sort_direction) {
2,228✔
331
        (Operator::LT, SortDirection::Ascending) => RangeSpec {
148✔
332
            start: None,
148✔
333
            end: Some(KeyEndpoint::Excluding(comparison_key)),
148✔
334
            direction: SortDirection::Ascending,
148✔
335
        },
148✔
336
        (Operator::LT, SortDirection::Descending) => RangeSpec {
384✔
337
            start: Some(KeyEndpoint::Excluding(comparison_key)),
384✔
338
            end: None,
384✔
339
            direction: SortDirection::Descending,
384✔
340
        },
384✔
341
        (Operator::LTE, SortDirection::Ascending) => RangeSpec {
28✔
342
            start: None,
28✔
343
            end: Some(KeyEndpoint::Including(comparison_key)),
28✔
344
            direction: SortDirection::Ascending,
28✔
345
        },
28✔
346
        (Operator::LTE, SortDirection::Descending) => RangeSpec {
×
347
            start: Some(KeyEndpoint::Including(comparison_key)),
×
348
            end: None,
×
349
            direction: SortDirection::Descending,
×
350
        },
×
351
        (Operator::GT, SortDirection::Ascending) => RangeSpec {
176✔
352
            start: Some(KeyEndpoint::Excluding(comparison_key)),
176✔
353
            end: Some(KeyEndpoint::Excluding(null_key)),
176✔
354
            direction: SortDirection::Ascending,
176✔
355
        },
176✔
356
        (Operator::GT, SortDirection::Descending) => RangeSpec {
386✔
357
            start: Some(KeyEndpoint::Excluding(null_key)),
386✔
358
            end: Some(KeyEndpoint::Excluding(comparison_key)),
386✔
359
            direction: SortDirection::Descending,
386✔
360
        },
386✔
361
        (Operator::GTE, SortDirection::Ascending) => RangeSpec {
722✔
362
            start: Some(KeyEndpoint::Including(comparison_key)),
722✔
363
            end: Some(KeyEndpoint::Excluding(null_key)),
722✔
364
            direction: SortDirection::Ascending,
722✔
365
        },
722✔
366
        (Operator::GTE, SortDirection::Descending) => RangeSpec {
384✔
367
            start: Some(KeyEndpoint::Excluding(null_key)),
384✔
368
            end: Some(KeyEndpoint::Including(comparison_key)),
384✔
369
            direction: SortDirection::Descending,
384✔
370
        },
384✔
371
        (other, _) => {
×
372
            panic!("operator {other:?} is not supported by sorted inverted index range query")
×
373
        }
×
374
    }
×
375
}
2,228✔
376

×
377
fn skip(
4,333✔
378
    iter: impl Iterator<Item = Result<u64, CacheError>>,
4,333✔
379
    skip: Skip,
4,333✔
380
) -> impl Iterator<Item = Result<u64, CacheError>> {
4,333✔
381
    match skip {
4,333✔
382
        Skip::Skip(n) => Either::Left(iter.skip(n)),
3,781✔
383
        Skip::After(after) => Either::Right(skip_after(iter, after)),
552✔
384
    }
×
385
}
4,333✔
386

×
387
struct SkipAfter<T: Iterator<Item = Result<u64, CacheError>>> {
×
388
    inner: T,
389
    after: Option<u64>,
×
390
}
×
391

×
392
impl<T: Iterator<Item = Result<u64, CacheError>>> Iterator for SkipAfter<T> {
×
393
    type Item = Result<u64, CacheError>;
×
394

×
395
    fn next(&mut self) -> Option<Self::Item> {
178,488✔
396
        loop {
×
397
            if let Some(after) = self.after {
334,044✔
398
                match self.inner.next() {
155,628✔
399
                    Some(Ok(id)) => {
155,556✔
400
                        if id == after {
155,556✔
401
                            self.after = None;
480✔
402
                        }
155,076✔
403
                    }
×
404
                    Some(Err(e)) => return Some(Err(e)),
×
405
                    None => return None,
72✔
406
                }
×
407
            } else {
×
408
                return self.inner.next();
178,416✔
409
            }
×
410
        }
×
411
    }
178,488✔
412
}
×
413

×
414
fn skip_after<T: Iterator<Item = Result<u64, CacheError>>>(iter: T, after: u64) -> SkipAfter<T> {
552✔
415
    SkipAfter {
552✔
416
        inner: iter,
552✔
417
        after: Some(after),
552✔
418
    }
552✔
419
}
552✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc