• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4113913291

pending completion
4113913291

Pull #821

github

GitHub
Merge a8cca3f0b into 8f74ec17e
Pull Request #821: refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync`

869 of 869 new or added lines in 45 files covered. (100.0%)

23486 of 37503 relevant lines covered (62.62%)

36806.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.38
/dozer-cache/src/cache/lmdb/query/handler.rs
1
use std::{cmp::Ordering, sync::Arc};
2

3
use super::iterator::{CacheIterator, KeyEndpoint};
4
use crate::cache::{
5
    expression::{Operator, QueryExpression, SortDirection},
6
    index,
7
    lmdb::{
8
        cache::{RecordDatabase, SecondaryIndexDatabases},
9
        query::intersection::intersection,
10
    },
11
    plan::{IndexScan, IndexScanKind, Plan, QueryPlanner, SortedInvertedRangeQuery},
12
};
13
use crate::errors::{CacheError, IndexError};
14
use dozer_storage::lmdb::Transaction;
15
use dozer_types::{
16
    bincode,
17
    parking_lot::RwLock,
18
    types::{Field, IndexDefinition, Record, Schema},
19
};
20
use itertools::Either;
21

22
pub struct LmdbQueryHandler<'a, T: Transaction> {
23
    db: RecordDatabase,
24
    secondary_index_databases: Arc<RwLock<SecondaryIndexDatabases>>,
25
    txn: &'a T,
26
    schema: Schema,
27
    secondary_indexes: Vec<IndexDefinition>,
28
    query: &'a QueryExpression,
29
    intersection_chunk_size: usize,
30
}
31
impl<'a, T: Transaction> LmdbQueryHandler<'a, T> {
32
    pub fn new(
544✔
33
        db: RecordDatabase,
544✔
34
        secondary_index_databases: Arc<RwLock<SecondaryIndexDatabases>>,
544✔
35
        txn: &'a T,
544✔
36
        schema: Schema,
544✔
37
        secondary_indexes: Vec<IndexDefinition>,
544✔
38
        query: &'a QueryExpression,
544✔
39
        intersection_chunk_size: usize,
544✔
40
    ) -> Self {
544✔
41
        Self {
544✔
42
            db,
544✔
43
            secondary_index_databases,
544✔
44
            txn,
544✔
45
            schema,
544✔
46
            secondary_indexes,
544✔
47
            query,
544✔
48
            intersection_chunk_size,
544✔
49
        }
544✔
50
    }
544✔
51

52
    pub fn count(&self) -> Result<usize, CacheError> {
268✔
53
        let planner = QueryPlanner::new(&self.schema, &self.secondary_indexes, self.query);
268✔
54
        let execution = planner.plan()?;
268✔
55
        match execution {
267✔
56
            Plan::IndexScans(index_scans) => Ok(self.build_index_scan(index_scans)?.count()),
249✔
57
            Plan::SeqScan(_) => Ok(self
17✔
58
                .db
17✔
59
                .count(self.txn)?
17✔
60
                .saturating_sub(self.query.skip)
17✔
61
                .min(self.query.limit.unwrap_or(usize::MAX))),
17✔
62
            Plan::ReturnEmpty => Ok(0),
1✔
63
        }
64
    }
268✔
65

66
    pub fn query(&self) -> Result<Vec<Record>, CacheError> {
276✔
67
        let planner = QueryPlanner::new(&self.schema, &self.secondary_indexes, self.query);
276✔
68
        let execution = planner.plan()?;
276✔
69
        match execution {
275✔
70
            Plan::IndexScans(index_scans) => {
252✔
71
                let scan = self.build_index_scan(index_scans)?;
252✔
72
                self.collect_records(scan)
252✔
73
            }
74
            Plan::SeqScan(_seq_scan) => self.iterate_and_deserialize(),
22✔
75
            Plan::ReturnEmpty => Ok(vec![]),
1✔
76
        }
77
    }
276✔
78

79
    pub fn iterate_and_deserialize(&self) -> Result<Vec<Record>, CacheError> {
22✔
80
        let cursor = self.db.open_ro_cursor(self.txn)?;
22✔
81
        CacheIterator::new(cursor, None, SortDirection::Ascending)
22✔
82
            .skip(self.query.skip)
22✔
83
            .take(self.query.limit.unwrap_or(usize::MAX))
22✔
84
            .map(|(_, v)| bincode::deserialize(v).map_err(CacheError::map_deserialization_error))
6,497✔
85
            .collect()
22✔
86
    }
22✔
87

88
    fn build_index_scan(
501✔
89
        &self,
501✔
90
        index_scans: Vec<IndexScan>,
501✔
91
    ) -> Result<impl Iterator<Item = [u8; 8]> + '_, CacheError> {
501✔
92
        debug_assert!(
93
            !index_scans.is_empty(),
501✔
94
            "Planner should not generate empty index scan"
×
95
        );
96
        let full_sacan = if index_scans.len() == 1 {
501✔
97
            // The fast path, without intersection calculation.
98
            Either::Left(self.query_with_secondary_index(&index_scans[0])?)
499✔
99
        } else {
100
            // Intersection of multiple index scans.
101
            let iterators = index_scans
2✔
102
                .iter()
2✔
103
                .map(|index_scan| {
4✔
104
                    self.query_with_secondary_index(index_scan)
4✔
105
                        .map(|iter| iter.map(u64::from_be_bytes))
4✔
106
                })
4✔
107
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
108
            Either::Right(
2✔
109
                intersection(iterators, self.intersection_chunk_size).map(|id| id.to_be_bytes()),
4✔
110
            )
2✔
111
        };
112
        Ok(full_sacan
501✔
113
            .skip(self.query.skip)
501✔
114
            .take(self.query.limit.unwrap_or(usize::MAX)))
501✔
115
    }
501✔
116

117
    fn query_with_secondary_index(
503✔
118
        &'a self,
503✔
119
        index_scan: &IndexScan,
503✔
120
    ) -> Result<impl Iterator<Item = [u8; 8]> + 'a, CacheError> {
503✔
121
        let schema_id = self
503✔
122
            .schema
503✔
123
            .identifier
503✔
124
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
503✔
125
        let index_db = *self
503✔
126
            .secondary_index_databases
503✔
127
            .read()
503✔
128
            .get(&(schema_id, index_scan.index_id))
503✔
129
            .ok_or(CacheError::SecondaryIndexDatabaseNotFound)?;
503✔
130

131
        let RangeSpec {
132
            start,
503✔
133
            end,
503✔
134
            direction,
503✔
135
        } = get_range_spec(&index_scan.kind, index_scan.is_single_field_sorted_inverted)?;
503✔
136

137
        let cursor = index_db.open_ro_cursor(self.txn)?;
503✔
138

139
        Ok(CacheIterator::new(cursor, start, direction)
503✔
140
            .take_while(move |(key, _)| {
503✔
141
                if let Some(end_key) = &end {
246,252✔
142
                    match index_db.cmp(self.txn, key, end_key.key()) {
199,390✔
143
                        Ordering::Less => matches!(direction, SortDirection::Ascending),
90,062✔
144
                        Ordering::Equal => matches!(end_key, KeyEndpoint::Including(_)),
42,527✔
145
                        Ordering::Greater => matches!(direction, SortDirection::Descending),
66,801✔
146
                    }
147
                } else {
148
                    true
46,862✔
149
                }
150
            })
246,252✔
151
            .map(|(_, id)| {
246,079✔
152
                id.try_into()
246,079✔
153
                    .expect("All values must be u64 ids in seconary index database")
246,079✔
154
            }))
246,079✔
155
    }
503✔
156

157
    fn collect_records(
252✔
158
        &self,
252✔
159
        ids: impl Iterator<Item = [u8; 8]>,
252✔
160
    ) -> Result<Vec<Record>, CacheError> {
252✔
161
        ids.map(|id| self.db.get(self.txn, id)).collect()
122,538✔
162
    }
252✔
163
}
164

165
#[derive(Debug)]
×
166
struct RangeSpec {
167
    start: Option<KeyEndpoint>,
168
    end: Option<KeyEndpoint>,
169
    direction: SortDirection,
170
}
171

172
fn get_range_spec(
1,871✔
173
    index_scan_kind: &IndexScanKind,
1,871✔
174
    is_single_field_sorted_inverted: bool,
1,871✔
175
) -> Result<RangeSpec, CacheError> {
1,871✔
176
    match &index_scan_kind {
1,871✔
177
        IndexScanKind::SortedInverted {
178
            eq_filters,
1,849✔
179
            range_query,
1,849✔
180
        } => {
1,849✔
181
            let comparison_key = build_sorted_inverted_comparision_key(
1,849✔
182
                eq_filters,
1,849✔
183
                range_query.as_ref(),
1,849✔
184
                is_single_field_sorted_inverted,
1,849✔
185
            );
1,849✔
186
            // There're 3 cases:
187
            // 1. Range query with operator.
188
            // 2. Range query without operator (only order by).
189
            // 3. No range query.
190
            Ok(if let Some(range_query) = range_query {
1,849✔
191
                match range_query.operator_and_value {
1,576✔
192
                    Some((operator, _)) => {
980✔
193
                        // Here we respond to case 1, examples are `a = 1 && b > 2` or `b < 2`.
980✔
194
                        let comparison_key = comparison_key.expect("here's at least a range query");
980✔
195
                        let null_key = build_sorted_inverted_comparision_key(
980✔
196
                            eq_filters,
980✔
197
                            Some(&SortedInvertedRangeQuery {
980✔
198
                                field_index: range_query.field_index,
980✔
199
                                operator_and_value: Some((operator, Field::Null)),
980✔
200
                                sort_direction: range_query.sort_direction,
980✔
201
                            }),
980✔
202
                            is_single_field_sorted_inverted,
980✔
203
                        )
980✔
204
                        .expect("we provided a range query");
980✔
205
                        get_key_interval_from_range_query(
980✔
206
                            comparison_key,
980✔
207
                            null_key,
980✔
208
                            operator,
980✔
209
                            range_query.sort_direction,
980✔
210
                        )
980✔
211
                    }
212
                    None => {
213
                        // Here we respond to case 2, examples are `a = 1 && b asc` or `b desc`.
214
                        if let Some(comparison_key) = comparison_key {
596✔
215
                            // This is the case like `a = 1 && b asc`. The comparison key is only built from `a = 1`.
216
                            // We use `a = 1 && b = null` as a sentinel, using the invariant that `null` is greater than anything.
217
                            let null_key = build_sorted_inverted_comparision_key(
402✔
218
                                eq_filters,
402✔
219
                                Some(&SortedInvertedRangeQuery {
402✔
220
                                    field_index: range_query.field_index,
402✔
221
                                    operator_and_value: Some((Operator::LT, Field::Null)),
402✔
222
                                    sort_direction: range_query.sort_direction,
402✔
223
                                }),
402✔
224
                                is_single_field_sorted_inverted,
402✔
225
                            )
402✔
226
                            .expect("we provided a range query");
402✔
227
                            match range_query.sort_direction {
402✔
228
                                SortDirection::Ascending => RangeSpec {
242✔
229
                                    start: Some(KeyEndpoint::Excluding(comparison_key)),
242✔
230
                                    end: Some(KeyEndpoint::Including(null_key)),
242✔
231
                                    direction: SortDirection::Ascending,
242✔
232
                                },
242✔
233
                                SortDirection::Descending => RangeSpec {
160✔
234
                                    start: Some(KeyEndpoint::Including(null_key)),
160✔
235
                                    end: Some(KeyEndpoint::Excluding(comparison_key)),
160✔
236
                                    direction: SortDirection::Descending,
160✔
237
                                },
160✔
238
                            }
239
                        } else {
240
                            // Just all of them.
241
                            RangeSpec {
194✔
242
                                start: None,
194✔
243
                                end: None,
194✔
244
                                direction: range_query.sort_direction,
194✔
245
                            }
194✔
246
                        }
247
                    }
248
                }
249
            } else {
250
                // Here we respond to case 3, examples are `a = 1` or `a = 1 && b = 2`.
251
                let comparison_key = comparison_key
273✔
252
                    .expect("here's at least a eq filter because there's no range query");
273✔
253
                RangeSpec {
273✔
254
                    start: Some(KeyEndpoint::Including(comparison_key.clone())),
273✔
255
                    end: Some(KeyEndpoint::Including(comparison_key)),
273✔
256
                    direction: SortDirection::Ascending, // doesn't matter
273✔
257
                }
273✔
258
            })
259
        }
260
        IndexScanKind::FullText { filter } => match filter.op {
22✔
261
            Operator::Contains => {
262
                let token = match &filter.val {
22✔
263
                    Field::String(token) => token,
20✔
264
                    Field::Text(token) => token,
2✔
265
                    _ => return Err(CacheError::Index(IndexError::ExpectedStringFullText)),
×
266
                };
267
                let key = index::get_full_text_secondary_index(token);
22✔
268
                Ok(RangeSpec {
22✔
269
                    start: Some(KeyEndpoint::Including(key.clone())),
22✔
270
                    end: Some(KeyEndpoint::Including(key)),
22✔
271
                    direction: SortDirection::Ascending, // doesn't matter
22✔
272
                })
22✔
273
            }
274
            Operator::MatchesAll | Operator::MatchesAny => {
275
                unimplemented!("matches all and matches any are not implemented")
×
276
            }
277
            other => panic!("operator {other:?} is not supported by full text index"),
×
278
        },
279
    }
280
}
1,871✔
281

282
fn build_sorted_inverted_comparision_key(
3,231✔
283
    eq_filters: &[(usize, Field)],
3,231✔
284
    range_query: Option<&SortedInvertedRangeQuery>,
3,231✔
285
    is_single_field_index: bool,
3,231✔
286
) -> Option<Vec<u8>> {
3,231✔
287
    let mut fields = vec![];
3,231✔
288
    eq_filters.iter().for_each(|filter| {
3,231✔
289
        fields.push(&filter.1);
2,521✔
290
    });
3,231✔
291
    if let Some(range_query) = range_query {
3,231✔
292
        if let Some((_, val)) = &range_query.operator_and_value {
2,958✔
293
            fields.push(val);
2,362✔
294
        }
2,362✔
295
    }
273✔
296
    if fields.is_empty() {
3,231✔
297
        None
194✔
298
    } else {
299
        Some(index::get_secondary_index(&fields, is_single_field_index))
3,037✔
300
    }
301
}
3,231✔
302

303
/// Here we use the invariant that `null` is greater than anything.
304
fn get_key_interval_from_range_query(
980✔
305
    comparison_key: Vec<u8>,
980✔
306
    null_key: Vec<u8>,
980✔
307
    operator: Operator,
980✔
308
    sort_direction: SortDirection,
980✔
309
) -> RangeSpec {
980✔
310
    match (operator, sort_direction) {
980✔
311
        (Operator::LT, SortDirection::Ascending) => RangeSpec {
68✔
312
            start: None,
68✔
313
            end: Some(KeyEndpoint::Excluding(comparison_key)),
68✔
314
            direction: SortDirection::Ascending,
68✔
315
        },
68✔
316
        (Operator::LT, SortDirection::Descending) => RangeSpec {
160✔
317
            start: Some(KeyEndpoint::Excluding(comparison_key)),
160✔
318
            end: None,
160✔
319
            direction: SortDirection::Descending,
160✔
320
        },
160✔
321
        (Operator::LTE, SortDirection::Ascending) => RangeSpec {
20✔
322
            start: None,
20✔
323
            end: Some(KeyEndpoint::Including(comparison_key)),
20✔
324
            direction: SortDirection::Ascending,
20✔
325
        },
20✔
326
        (Operator::LTE, SortDirection::Descending) => RangeSpec {
×
327
            start: Some(KeyEndpoint::Including(comparison_key)),
×
328
            end: None,
×
329
            direction: SortDirection::Descending,
×
330
        },
×
331
        (Operator::GT, SortDirection::Ascending) => RangeSpec {
88✔
332
            start: Some(KeyEndpoint::Excluding(comparison_key)),
88✔
333
            end: Some(KeyEndpoint::Excluding(null_key)),
88✔
334
            direction: SortDirection::Ascending,
88✔
335
        },
88✔
336
        (Operator::GT, SortDirection::Descending) => RangeSpec {
162✔
337
            start: Some(KeyEndpoint::Excluding(null_key)),
162✔
338
            end: Some(KeyEndpoint::Excluding(comparison_key)),
162✔
339
            direction: SortDirection::Descending,
162✔
340
        },
162✔
341
        (Operator::GTE, SortDirection::Ascending) => RangeSpec {
322✔
342
            start: Some(KeyEndpoint::Including(comparison_key)),
322✔
343
            end: Some(KeyEndpoint::Excluding(null_key)),
322✔
344
            direction: SortDirection::Ascending,
322✔
345
        },
322✔
346
        (Operator::GTE, SortDirection::Descending) => RangeSpec {
160✔
347
            start: Some(KeyEndpoint::Excluding(null_key)),
160✔
348
            end: Some(KeyEndpoint::Including(comparison_key)),
160✔
349
            direction: SortDirection::Descending,
160✔
350
        },
160✔
351
        (other, _) => {
×
352
            panic!("operator {other:?} is not supported by sorted inverted index range query")
×
353
        }
354
    }
355
}
980✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc