• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.48
/dozer-cache/src/cache/plan/planner.rs
1
use crate::cache::expression::{FilterExpression, Operator, SortDirection, SortOptions};
2
use crate::errors::PlanError;
3
use dozer_types::json_value_to_field;
4
use dozer_types::types::{Field, FieldDefinition, Schema};
5
use dozer_types::types::{FieldType, IndexDefinition};
6

7
use super::helper::{RangeQuery, RangeQueryKind};
8
use super::{helper, IndexScan, Plan, SeqScan};
9
use super::{IndexFilter, IndexScanKind};
10

11
pub struct QueryPlanner<'a> {
12
    schema: &'a Schema,
13
    secondary_indexes: &'a [IndexDefinition],
14
    filter: Option<&'a FilterExpression>,
15
    order_by: &'a SortOptions,
16
}
17
impl<'a> QueryPlanner<'a> {
×
18
    pub fn new(
4,477✔
19
        schema: &'a Schema,
4,477✔
20
        secondary_indexes: &'a [IndexDefinition],
4,477✔
21
        filter: Option<&'a FilterExpression>,
4,477✔
22
        order_by: &'a SortOptions,
4,477✔
23
    ) -> Self {
4,477✔
24
        Self {
4,477✔
25
            schema,
4,477✔
26
            secondary_indexes,
4,477✔
27
            filter,
4,477✔
28
            order_by,
4,477✔
29
        }
4,477✔
30
    }
4,477✔
31

×
32
    pub fn plan(&self) -> Result<Plan, PlanError> {
4,477✔
33
        // Collect all the filters.
4,477✔
34
        // TODO: Handle filters like And([a > 0, a < 10]).
4,477✔
35
        let mut filters = vec![];
4,477✔
36
        if let Some(expression) = &self.filter {
4,477✔
37
            collect_filters(self.schema, expression, &mut filters)?;
3,747✔
38
        }
730✔
39

×
40
        // Filter the sort options.
×
41
        // TODO: Handle duplicate fields.
42
        let mut order_by = vec![];
4,477✔
43
        for order in &self.order_by.0 {
7,416✔
44
            // Find the field index.
×
45
            let (field_index, _, _) =
2,939✔
46
                get_field_index_and_type(&order.field_name, &self.schema.fields)
2,939✔
47
                    .ok_or_else(|| PlanError::FieldNotFound(order.field_name.clone()))?;
2,939✔
48
            // If the field is already in a filter supported by `SortedInverted`, mark the corresponding filter.
×
49
            if seen_in_sorted_inverted_filter(field_index, order.direction, &mut filters)? {
2,939✔
50
                continue;
1,543✔
51
            }
1,396✔
52
            // This sort option needs to be in the plan.
1,396✔
53
            order_by.push((field_index, order.direction));
1,396✔
54
        }
×
55

×
56
        // If no filter and sort is requested, return a SeqScan.
×
57
        if filters.is_empty() && order_by.is_empty() {
4,477✔
58
            return Ok(Plan::SeqScan(SeqScan {
296✔
59
                direction: SortDirection::Ascending,
296✔
60
            }));
296✔
61
        }
4,181✔
62

4,181✔
63
        // If non-`Eq` filter is applied to `null` value, return empty result.
4,181✔
64
        if filters
4,181✔
65
            .iter()
4,181✔
66
            .any(|f| matches!(f.0.val, Field::Null) && f.0.op != Operator::EQ)
5,412✔
67
        {
68
            return Ok(Plan::ReturnEmpty);
3✔
69
        }
4,178✔
70

71
        // Find the range query, can be a range filter or a sort option.
72
        let range_query = find_range_query(&mut filters, &order_by)?;
4,178✔
73

74
        // Generate some index scans that can answer this query, lazily.
75
        let all_index_scans = helper::get_all_indexes(filters, range_query);
4,178✔
76

×
77
        // Check if existing secondary indexes can satisfy any of the scans.
78
        for index_scans in all_index_scans {
4,182✔
79
            if let Some(index_scans) = all_indexes_are_present(self.secondary_indexes, index_scans)
4,180✔
80
            {
81
                return Ok(Plan::IndexScans(index_scans));
4,176✔
82
            }
4✔
83
        }
×
84

85
        Err(PlanError::MatchingIndexNotFound)
2✔
86
    }
4,477✔
87
}
×
88

×
89
fn get_field_index_and_type(
8,351✔
90
    field_name: &str,
8,351✔
91
    fields: &[FieldDefinition],
8,351✔
92
) -> Option<(usize, FieldType, bool)> {
8,351✔
93
    fields
8,351✔
94
        .iter()
8,351✔
95
        .enumerate()
8,351✔
96
        .find(|(_, f)| f.name == field_name)
30,616✔
97
        .map(|(i, f)| (i, f.typ, f.nullable))
8,351✔
98
}
8,351✔
99

×
100
fn collect_filters(
6,525✔
101
    schema: &Schema,
6,525✔
102
    expression: &FilterExpression,
6,525✔
103
    filters: &mut Vec<(IndexFilter, Option<SortDirection>)>,
6,525✔
104
) -> Result<(), PlanError> {
6,525✔
105
    match expression {
6,525✔
106
        FilterExpression::Simple(field_name, operator, value) => {
5,412✔
107
            let (field_index, field_type, nullable) =
5,412✔
108
                get_field_index_and_type(field_name, &schema.fields)
5,412✔
109
                    .ok_or_else(|| PlanError::FieldNotFound(field_name.clone()))?;
5,412✔
110
            let field = json_value_to_field(value.clone(), field_type, nullable)?;
5,412✔
111
            filters.push((IndexFilter::new(field_index, *operator, field), None));
5,412✔
112
        }
×
113
        FilterExpression::And(expressions) => {
1,113✔
114
            for expression in expressions {
3,891✔
115
                collect_filters(schema, expression, filters)?;
2,778✔
116
            }
×
117
        }
×
118
    }
119
    Ok(())
6,525✔
120
}
6,525✔
121

×
122
fn seen_in_sorted_inverted_filter(
2,939✔
123
    field_index: usize,
2,939✔
124
    sort_direction: SortDirection,
2,939✔
125
    filters: &mut [(IndexFilter, Option<SortDirection>)],
2,939✔
126
) -> Result<bool, PlanError> {
2,939✔
127
    for filter in filters {
3,901✔
128
        if filter.0.field_index == field_index {
2,505✔
129
            return if !filter.0.op.supported_by_sorted_inverted() {
1,543✔
130
                Err(PlanError::CannotSortFullTextFilter)
×
131
            } else if let Some(direction) = filter.1 {
1,543✔
132
                if direction == sort_direction {
×
133
                    Ok(true)
×
134
                } else {
135
                    Err(PlanError::ConflictingSortOptions)
×
136
                }
×
137
            } else {
138
                filter.1 = Some(sort_direction);
1,543✔
139
                Ok(true)
1,543✔
140
            };
141
        }
962✔
142
    }
×
143

144
    Ok(false)
1,396✔
145
}
2,939✔
146

×
147
fn find_range_query(
4,178✔
148
    filters: &mut Vec<(IndexFilter, Option<SortDirection>)>,
4,178✔
149
    order_by: &[(usize, SortDirection)],
4,178✔
150
) -> Result<Option<RangeQuery>, PlanError> {
4,178✔
151
    let mut num_range_ops = 0;
4,178✔
152
    let mut range_filter_index = None;
4,178✔
153
    for (i, filter) in filters.iter().enumerate() {
5,409✔
154
        if filter.0.op.is_range_operator() {
5,409✔
155
            num_range_ops += 1;
2,229✔
156
            range_filter_index = Some(i);
2,229✔
157
        }
3,180✔
158
    }
×
159
    num_range_ops += order_by.len();
4,178✔
160
    if num_range_ops > 1 {
4,178✔
161
        return Err(PlanError::RangeQueryLimit);
×
162
    }
4,178✔
163
    Ok(if let Some(range_filter_index) = range_filter_index {
4,178✔
164
        let filter = filters.remove(range_filter_index);
2,229✔
165
        Some(RangeQuery::new(
2,229✔
166
            filter.0.field_index,
2,229✔
167
            RangeQueryKind::Filter {
2,229✔
168
                operator: filter.0.op,
2,229✔
169
                value: filter.0.val,
2,229✔
170
                sort_direction: filter.1,
2,229✔
171
            },
2,229✔
172
        ))
2,229✔
173
    } else if let Some((field_index, sort_direction)) = order_by.first() {
1,949✔
174
        Some(RangeQuery::new(
1,396✔
175
            *field_index,
1,396✔
176
            RangeQueryKind::OrderBy {
1,396✔
177
                sort_direction: *sort_direction,
1,396✔
178
            },
1,396✔
179
        ))
1,396✔
180
    } else {
×
181
        None
553✔
182
    })
183
}
4,178✔
184

×
185
impl IndexScanKind {
186
    fn is_supported_by_index(&self, index: &IndexDefinition) -> bool {
187
        match (self, index) {
17,847✔
188
            (
×
189
                IndexScanKind::SortedInverted {
×
190
                    eq_filters,
17,617✔
191
                    range_query,
17,617✔
192
                },
17,617✔
193
                IndexDefinition::SortedInverted(fields),
17,617✔
194
            ) => {
17,617✔
195
                if fields.len() < eq_filters.len() {
17,617✔
196
                    return false;
2,236✔
197
                }
15,381✔
198
                if !eq_filters
15,381✔
199
                    .iter()
15,381✔
200
                    .zip(fields)
15,381✔
201
                    .all(|(filter, field)| filter.0 == *field)
15,381✔
202
                {
×
203
                    return false;
7,293✔
204
                }
8,088✔
205
                if let Some(range_query) = range_query {
8,088✔
206
                    if fields.len() != eq_filters.len() + 1 {
7,562✔
207
                        return false;
1,516✔
208
                    }
6,046✔
209
                    let last_field = fields
6,046✔
210
                        .last()
6,046✔
211
                        .expect("We've checked `fields.len()` is at least 1");
6,046✔
212
                    range_query.field_index == *last_field
6,046✔
213
                } else {
214
                    fields.len() == eq_filters.len()
526✔
215
                }
×
216
            }
217
            (IndexScanKind::FullText { filter }, IndexDefinition::FullText(field_index)) => {
34✔
218
                filter.field_index == *field_index
34✔
219
            }
×
220
            _ => false,
196✔
221
        }
222
    }
17,847✔
223
}
×
224

×
225
fn all_indexes_are_present(
4,180✔
226
    indexes: &[IndexDefinition],
4,180✔
227
    index_scan_kinds: Vec<IndexScanKind>,
4,180✔
228
) -> Option<Vec<IndexScan>> {
4,180✔
229
    let mut scans = vec![];
4,180✔
230
    for index_scan_kind in index_scan_kinds {
8,358✔
231
        let found = indexes
4,182✔
232
            .iter()
4,182✔
233
            .enumerate()
4,182✔
234
            .find(|(_, i)| index_scan_kind.is_supported_by_index(i));
17,833✔
235

4,182✔
236
        match found {
4,182✔
237
            Some((idx, _)) => {
4,178✔
238
                scans.push(IndexScan {
4,178✔
239
                    index_id: idx,
4,178✔
240
                    kind: index_scan_kind,
4,178✔
241
                });
4,178✔
242
            }
4,178✔
243
            None => return None,
4✔
244
        }
×
245
    }
×
246
    Some(scans)
4,176✔
247
}
4,180✔
248

×
249
#[cfg(test)]
250
mod tests {
×
251
    use crate::cache::plan::SortedInvertedRangeQuery;
×
252

253
    use super::*;
×
254

255
    #[test]
1✔
256
    fn test_is_supported_by_index() {
1✔
257
        let check_sorted_inverted =
1✔
258
            |eq_filters: Vec<usize>, range_query: Option<usize>, index, expected: bool| {
10✔
259
                assert_eq!(
10✔
260
                    IndexScanKind::SortedInverted {
10✔
261
                        eq_filters: eq_filters
10✔
262
                            .into_iter()
10✔
263
                            .map(|index| (index, Field::Null))
11✔
264
                            .collect(),
10✔
265
                        range_query: range_query.map(|index| SortedInvertedRangeQuery {
10✔
266
                            field_index: index,
5✔
267
                            sort_direction: SortDirection::Ascending,
5✔
268
                            operator_and_value: None,
5✔
269
                        })
10✔
270
                    }
10✔
271
                    .is_supported_by_index(&IndexDefinition::SortedInverted(index)),
10✔
272
                    expected
10✔
273
                );
10✔
274
            };
10✔
275

×
276
        check_sorted_inverted(vec![0], None, vec![0], true);
1✔
277
        check_sorted_inverted(vec![0], None, vec![1], false);
1✔
278
        check_sorted_inverted(vec![0], None, vec![0, 1], false);
1✔
279
        check_sorted_inverted(vec![0, 1], None, vec![0], false);
1✔
280
        check_sorted_inverted(vec![0, 1], None, vec![0, 1], true);
1✔
281
        check_sorted_inverted(vec![], Some(0), vec![0], true);
1✔
282
        check_sorted_inverted(vec![0], Some(1), vec![0, 1], true);
1✔
283
        check_sorted_inverted(vec![0], Some(1), vec![0, 1, 2], false);
1✔
284
        check_sorted_inverted(vec![0], Some(1), vec![0, 2], false);
1✔
285
        check_sorted_inverted(vec![0], Some(1), vec![0], false);
1✔
286

1✔
287
        let full_text_scan = IndexScanKind::FullText {
1✔
288
            filter: IndexFilter {
1✔
289
                field_index: 0,
1✔
290
                op: Operator::Contains,
1✔
291
                val: Field::Null,
1✔
292
            },
1✔
293
        };
1✔
294
        assert!(full_text_scan.is_supported_by_index(&IndexDefinition::FullText(0)),);
1✔
295
        assert!(!full_text_scan.is_supported_by_index(&IndexDefinition::FullText(1)));
1✔
296

×
297
        assert!(!full_text_scan.is_supported_by_index(&IndexDefinition::SortedInverted(vec![0])),);
1✔
298
        assert!(!IndexScanKind::SortedInverted {
1✔
299
            eq_filters: vec![(0, Field::Null)],
1✔
300
            range_query: None
1✔
301
        }
1✔
302
        .is_supported_by_index(&IndexDefinition::FullText(0)),);
1✔
303
    }
1✔
304
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc