• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.3
/dozer-cache/src/cache/lmdb/query/handler.rs
1
use std::{cmp::Ordering, sync::Arc};
2

3
use super::iterator::{CacheIterator, KeyEndpoint};
4
use crate::cache::{
5
    expression::{Operator, QueryExpression, SortDirection},
6
    index,
7
    lmdb::{
8
        cache::{RecordDatabase, SecondaryIndexDatabases},
9
        query::intersection::intersection,
10
    },
11
    plan::{IndexScan, IndexScanKind, Plan, QueryPlanner, SortedInvertedRangeQuery},
12
};
13
use crate::errors::{CacheError, IndexError};
14
use dozer_types::{
15
    bincode,
16
    parking_lot::RwLock,
17
    types::{Field, IndexDefinition, Record, Schema},
18
};
19
use itertools::Either;
20
use lmdb::RoTransaction;
21

22
pub struct LmdbQueryHandler<'a> {
23
    db: RecordDatabase,
24
    secondary_index_databases: Arc<RwLock<SecondaryIndexDatabases>>,
25
    txn: &'a RoTransaction<'a>,
26
    schema: &'a Schema,
27
    secondary_indexes: &'a [IndexDefinition],
28
    query: &'a QueryExpression,
29
    intersection_chunk_size: usize,
30
}
31
impl<'a> LmdbQueryHandler<'a> {
32
    pub fn new(
2,508✔
33
        db: RecordDatabase,
2,508✔
34
        secondary_index_databases: Arc<RwLock<SecondaryIndexDatabases>>,
2,508✔
35
        txn: &'a RoTransaction,
2,508✔
36
        schema: &'a Schema,
2,508✔
37
        secondary_indexes: &'a [IndexDefinition],
2,508✔
38
        query: &'a QueryExpression,
2,508✔
39
        intersection_chunk_size: usize,
2,508✔
40
    ) -> Self {
2,508✔
41
        Self {
2,508✔
42
            db,
2,508✔
43
            secondary_index_databases,
2,508✔
44
            txn,
2,508✔
45
            schema,
2,508✔
46
            secondary_indexes,
2,508✔
47
            query,
2,508✔
48
            intersection_chunk_size,
2,508✔
49
        }
2,508✔
50
    }
2,508✔
51

52
    pub fn count(&self) -> Result<usize, CacheError> {
1,244✔
53
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
1,244✔
54
        let execution = planner.plan()?;
1,244✔
55
        match execution {
1,243✔
56
            Plan::IndexScans(index_scans) => Ok(self.build_index_scan(index_scans)?.count()),
1,161✔
57
            Plan::SeqScan(_) => Ok(self
81✔
58
                .db
81✔
59
                .count(self.txn)?
81✔
60
                .saturating_sub(self.query.skip)
81✔
61
                .min(self.query.limit.unwrap_or(usize::MAX))),
81✔
62
            Plan::ReturnEmpty => Ok(0),
1✔
63
        }
64
    }
1,244✔
65

66
    pub fn query(&self) -> Result<Vec<Record>, CacheError> {
1,264✔
67
        let planner = QueryPlanner::new(self.schema, self.secondary_indexes, self.query);
1,264✔
68
        let execution = planner.plan()?;
1,264✔
69
        match execution {
1,263✔
70
            Plan::IndexScans(index_scans) => {
1,164✔
71
                let scan = self.build_index_scan(index_scans)?;
1,164✔
72
                self.collect_records(scan)
1,164✔
73
            }
74
            Plan::SeqScan(_seq_scan) => self.iterate_and_deserialize(),
98✔
75
            Plan::ReturnEmpty => Ok(vec![]),
1✔
76
        }
77
    }
1,264✔
78

79
    pub fn iterate_and_deserialize(&self) -> Result<Vec<Record>, CacheError> {
98✔
80
        let cursor = self.db.open_ro_cursor(self.txn)?;
98✔
81
        CacheIterator::new(cursor, None, SortDirection::Ascending)
98✔
82
            .skip(self.query.skip)
98✔
83
            .take(self.query.limit.unwrap_or(usize::MAX))
98✔
84
            .map(|(_, v)| bincode::deserialize(v).map_err(CacheError::map_deserialization_error))
32,445✔
85
            .collect()
98✔
86
    }
98✔
87

88
    fn build_index_scan(
2,325✔
89
        &self,
2,325✔
90
        index_scans: Vec<IndexScan>,
2,325✔
91
    ) -> Result<impl Iterator<Item = [u8; 8]> + '_, CacheError> {
2,325✔
92
        debug_assert!(
93
            !index_scans.is_empty(),
2,325✔
94
            "Planner should not generate empty index scan"
×
95
        );
96
        let full_sacan = if index_scans.len() == 1 {
2,325✔
97
            // The fast path, without intersection calculation.
98
            Either::Left(self.query_with_secondary_index(&index_scans[0])?)
2,323✔
99
        } else {
100
            // Intersection of multiple index scans.
101
            let iterators = index_scans
2✔
102
                .iter()
2✔
103
                .map(|index_scan| {
4✔
104
                    self.query_with_secondary_index(index_scan)
4✔
105
                        .map(|iter| iter.map(u64::from_be_bytes))
4✔
106
                })
4✔
107
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
108
            Either::Right(
2✔
109
                intersection(iterators, self.intersection_chunk_size).map(|id| id.to_be_bytes()),
4✔
110
            )
2✔
111
        };
112
        Ok(full_sacan
2,325✔
113
            .skip(self.query.skip)
2,325✔
114
            .take(self.query.limit.unwrap_or(usize::MAX)))
2,325✔
115
    }
2,325✔
116

117
    fn query_with_secondary_index(
2,327✔
118
        &'a self,
2,327✔
119
        index_scan: &IndexScan,
2,327✔
120
    ) -> Result<impl Iterator<Item = [u8; 8]> + 'a, CacheError> {
2,327✔
121
        let schema_id = self
2,327✔
122
            .schema
2,327✔
123
            .identifier
2,327✔
124
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
2,327✔
125
        let index_db = *self
2,327✔
126
            .secondary_index_databases
2,327✔
127
            .read()
2,327✔
128
            .get(&(schema_id, index_scan.index_id))
2,327✔
129
            .ok_or(CacheError::SecondaryIndexDatabaseNotFound)?;
2,327✔
130

131
        let RangeSpec {
132
            start,
2,327✔
133
            end,
2,327✔
134
            direction,
2,327✔
135
        } = get_range_spec(&index_scan.kind, index_scan.is_single_field_sorted_inverted)?;
2,327✔
136

137
        let cursor = index_db.open_ro_cursor(self.txn)?;
2,327✔
138

139
        Ok(CacheIterator::new(cursor, start, direction)
2,327✔
140
            .take_while(move |(key, _)| {
2,327✔
141
                if let Some(end_key) = &end {
1,230,572✔
142
                    match index_db.cmp(self.txn, key, end_key.key()) {
996,326✔
143
                        Ordering::Less => matches!(direction, SortDirection::Ascending),
449,990✔
144
                        Ordering::Equal => matches!(end_key, KeyEndpoint::Including(_)),
212,439✔
145
                        Ordering::Greater => matches!(direction, SortDirection::Descending),
333,897✔
146
                    }
147
                } else {
148
                    true
234,246✔
149
                }
150
            })
1,230,572✔
151
            .map(|(_, id)| {
1,229,847✔
152
                id.try_into()
1,229,847✔
153
                    .expect("All values must be u64 ids in seconary index database")
1,229,847✔
154
            }))
1,229,847✔
155
    }
2,327✔
156

157
    fn collect_records(
1,164✔
158
        &self,
1,164✔
159
        ids: impl Iterator<Item = [u8; 8]>,
1,164✔
160
    ) -> Result<Vec<Record>, CacheError> {
1,164✔
161
        ids.map(|id| self.db.get(self.txn, id)).collect()
612,438✔
162
    }
1,164✔
163
}
164

165
#[derive(Debug)]
×
166
struct RangeSpec {
167
    start: Option<KeyEndpoint>,
168
    end: Option<KeyEndpoint>,
169
    direction: SortDirection,
170
}
171

172
fn get_range_spec(
2,327✔
173
    index_scan_kind: &IndexScanKind,
2,327✔
174
    is_single_field_sorted_inverted: bool,
2,327✔
175
) -> Result<RangeSpec, CacheError> {
2,327✔
176
    match &index_scan_kind {
2,327✔
177
        IndexScanKind::SortedInverted {
178
            eq_filters,
2,301✔
179
            range_query,
2,301✔
180
        } => {
2,301✔
181
            let comparison_key = build_sorted_inverted_comparision_key(
2,301✔
182
                eq_filters,
2,301✔
183
                range_query.as_ref(),
2,301✔
184
                is_single_field_sorted_inverted,
2,301✔
185
            );
2,301✔
186
            // There're 3 cases:
187
            // 1. Range query with operator.
188
            // 2. Range query without operator (only order by).
189
            // 3. No range query.
190
            Ok(if let Some(range_query) = range_query {
2,301✔
191
                match range_query.operator_and_value {
1,964✔
192
                    Some((operator, _)) => {
1,220✔
193
                        // Here we respond to case 1, examples are `a = 1 && b > 2` or `b < 2`.
1,220✔
194
                        let comparison_key = comparison_key.expect("here's at least a range query");
1,220✔
195
                        let null_key = build_sorted_inverted_comparision_key(
1,220✔
196
                            eq_filters,
1,220✔
197
                            Some(&SortedInvertedRangeQuery {
1,220✔
198
                                field_index: range_query.field_index,
1,220✔
199
                                operator_and_value: Some((operator, Field::Null)),
1,220✔
200
                                sort_direction: range_query.sort_direction,
1,220✔
201
                            }),
1,220✔
202
                            is_single_field_sorted_inverted,
1,220✔
203
                        )
1,220✔
204
                        .expect("we provided a range query");
1,220✔
205
                        get_key_interval_from_range_query(
1,220✔
206
                            comparison_key,
1,220✔
207
                            null_key,
1,220✔
208
                            operator,
1,220✔
209
                            range_query.sort_direction,
1,220✔
210
                        )
1,220✔
211
                    }
212
                    None => {
213
                        // Here we respond to case 2, examples are `a = 1 && b asc` or `b desc`.
214
                        if let Some(comparison_key) = comparison_key {
744✔
215
                            // This is the case like `a = 1 && b asc`. The comparison key is only built from `a = 1`.
216
                            // We use `a = 1 && b = null` as a sentinel, using the invariant that `null` is greater than anything.
217
                            let null_key = build_sorted_inverted_comparision_key(
502✔
218
                                eq_filters,
502✔
219
                                Some(&SortedInvertedRangeQuery {
502✔
220
                                    field_index: range_query.field_index,
502✔
221
                                    operator_and_value: Some((Operator::LT, Field::Null)),
502✔
222
                                    sort_direction: range_query.sort_direction,
502✔
223
                                }),
502✔
224
                                is_single_field_sorted_inverted,
502✔
225
                            )
502✔
226
                            .expect("we provided a range query");
502✔
227
                            match range_query.sort_direction {
502✔
228
                                SortDirection::Ascending => RangeSpec {
302✔
229
                                    start: Some(KeyEndpoint::Excluding(comparison_key)),
302✔
230
                                    end: Some(KeyEndpoint::Including(null_key)),
302✔
231
                                    direction: SortDirection::Ascending,
302✔
232
                                },
302✔
233
                                SortDirection::Descending => RangeSpec {
200✔
234
                                    start: Some(KeyEndpoint::Including(null_key)),
200✔
235
                                    end: Some(KeyEndpoint::Excluding(comparison_key)),
200✔
236
                                    direction: SortDirection::Descending,
200✔
237
                                },
200✔
238
                            }
239
                        } else {
240
                            // Just all of them.
241
                            RangeSpec {
242✔
242
                                start: None,
242✔
243
                                end: None,
242✔
244
                                direction: range_query.sort_direction,
242✔
245
                            }
242✔
246
                        }
247
                    }
248
                }
249
            } else {
250
                // Here we respond to case 3, examples are `a = 1` or `a = 1 && b = 2`.
251
                let comparison_key = comparison_key
337✔
252
                    .expect("here's at least a eq filter because there's no range query");
337✔
253
                RangeSpec {
337✔
254
                    start: Some(KeyEndpoint::Including(comparison_key.clone())),
337✔
255
                    end: Some(KeyEndpoint::Including(comparison_key)),
337✔
256
                    direction: SortDirection::Ascending, // doesn't matter
337✔
257
                }
337✔
258
            })
259
        }
260
        IndexScanKind::FullText { filter } => match filter.op {
26✔
261
            Operator::Contains => {
262
                let token = match &filter.val {
26✔
263
                    Field::String(token) => token,
24✔
264
                    Field::Text(token) => token,
2✔
265
                    _ => return Err(CacheError::IndexError(IndexError::ExpectedStringFullText)),
×
266
                };
267
                let key = index::get_full_text_secondary_index(token);
26✔
268
                Ok(RangeSpec {
26✔
269
                    start: Some(KeyEndpoint::Including(key.clone())),
26✔
270
                    end: Some(KeyEndpoint::Including(key)),
26✔
271
                    direction: SortDirection::Ascending, // doesn't matter
26✔
272
                })
26✔
273
            }
274
            Operator::MatchesAll | Operator::MatchesAny => {
275
                unimplemented!("matches all and matches any are not implemented")
×
276
            }
277
            other => panic!("operator {other:?} is not supported by full text index"),
×
278
        },
279
    }
280
}
2,327✔
281

282
fn build_sorted_inverted_comparision_key(
4,023✔
283
    eq_filters: &[(usize, Field)],
4,023✔
284
    range_query: Option<&SortedInvertedRangeQuery>,
4,023✔
285
    is_single_field_index: bool,
4,023✔
286
) -> Option<Vec<u8>> {
4,023✔
287
    let mut fields = vec![];
4,023✔
288
    eq_filters.iter().for_each(|filter| {
4,023✔
289
        fields.push(&filter.1);
3,145✔
290
    });
4,023✔
291
    if let Some(range_query) = range_query {
4,023✔
292
        if let Some((_, val)) = &range_query.operator_and_value {
3,686✔
293
            fields.push(val);
2,942✔
294
        }
2,942✔
295
    }
337✔
296
    if fields.is_empty() {
4,023✔
297
        None
242✔
298
    } else {
299
        Some(index::get_secondary_index(&fields, is_single_field_index))
3,781✔
300
    }
301
}
4,023✔
302

303
/// Here we use the invariant that `null` is greater than anything.
304
fn get_key_interval_from_range_query(
1,220✔
305
    comparison_key: Vec<u8>,
1,220✔
306
    null_key: Vec<u8>,
1,220✔
307
    operator: Operator,
1,220✔
308
    sort_direction: SortDirection,
1,220✔
309
) -> RangeSpec {
1,220✔
310
    match (operator, sort_direction) {
1,220✔
311
        (Operator::LT, SortDirection::Ascending) => RangeSpec {
84✔
312
            start: None,
84✔
313
            end: Some(KeyEndpoint::Excluding(comparison_key)),
84✔
314
            direction: SortDirection::Ascending,
84✔
315
        },
84✔
316
        (Operator::LT, SortDirection::Descending) => RangeSpec {
200✔
317
            start: Some(KeyEndpoint::Excluding(comparison_key)),
200✔
318
            end: None,
200✔
319
            direction: SortDirection::Descending,
200✔
320
        },
200✔
321
        (Operator::LTE, SortDirection::Ascending) => RangeSpec {
24✔
322
            start: None,
24✔
323
            end: Some(KeyEndpoint::Including(comparison_key)),
24✔
324
            direction: SortDirection::Ascending,
24✔
325
        },
24✔
326
        (Operator::LTE, SortDirection::Descending) => RangeSpec {
×
327
            start: Some(KeyEndpoint::Including(comparison_key)),
×
328
            end: None,
×
329
            direction: SortDirection::Descending,
×
330
        },
×
331
        (Operator::GT, SortDirection::Ascending) => RangeSpec {
108✔
332
            start: Some(KeyEndpoint::Excluding(comparison_key)),
108✔
333
            end: Some(KeyEndpoint::Excluding(null_key)),
108✔
334
            direction: SortDirection::Ascending,
108✔
335
        },
108✔
336
        (Operator::GT, SortDirection::Descending) => RangeSpec {
202✔
337
            start: Some(KeyEndpoint::Excluding(null_key)),
202✔
338
            end: Some(KeyEndpoint::Excluding(comparison_key)),
202✔
339
            direction: SortDirection::Descending,
202✔
340
        },
202✔
341
        (Operator::GTE, SortDirection::Ascending) => RangeSpec {
402✔
342
            start: Some(KeyEndpoint::Including(comparison_key)),
402✔
343
            end: Some(KeyEndpoint::Excluding(null_key)),
402✔
344
            direction: SortDirection::Ascending,
402✔
345
        },
402✔
346
        (Operator::GTE, SortDirection::Descending) => RangeSpec {
200✔
347
            start: Some(KeyEndpoint::Excluding(null_key)),
200✔
348
            end: Some(KeyEndpoint::Including(comparison_key)),
200✔
349
            direction: SortDirection::Descending,
200✔
350
        },
200✔
351
        (other, _) => {
×
352
            panic!("operator {other:?} is not supported by sorted inverted index range query")
×
353
        }
×
354
    }
×
355
}
1,220✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc