• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4351017879

pending completion
4351017879

push

github

GitHub
chore: bump sqlparser v0.31.0 (#1137)

29428 of 38369 relevant lines covered (76.7%)

62214.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.82
/dozer-cache/src/cache/plan/planner.rs
1
use crate::cache::expression::{FilterExpression, Operator, QueryExpression, SortDirection};
2
use crate::errors::PlanError;
3
use dozer_types::json_value_to_field;
4
use dozer_types::types::{Field, FieldDefinition, Schema};
5
use dozer_types::types::{FieldType, IndexDefinition};
6

7
use super::helper::{RangeQuery, RangeQueryKind};
8
use super::{helper, IndexScan, Plan, SeqScan};
9
use super::{IndexFilter, IndexScanKind};
10

11
pub struct QueryPlanner<'a> {
12
    schema: &'a Schema,
13
    secondary_indexes: &'a [IndexDefinition],
14
    query: &'a QueryExpression,
15
}
16
impl<'a> QueryPlanner<'a> {
17
    pub fn new(
8,131✔
18
        schema: &'a Schema,
8,131✔
19
        secondary_indexes: &'a [IndexDefinition],
8,131✔
20
        query: &'a QueryExpression,
8,131✔
21
    ) -> Self {
8,131✔
22
        Self {
8,131✔
23
            schema,
8,131✔
24
            secondary_indexes,
8,131✔
25
            query,
8,131✔
26
        }
8,131✔
27
    }
8,131✔
28

29
    pub fn plan(&self) -> Result<Plan, PlanError> {
8,131✔
30
        // Collect all the filters.
8,131✔
31
        // TODO: Handle filters like And([a > 0, a < 10]).
8,131✔
32
        let mut filters = vec![];
8,131✔
33
        if let Some(expression) = &self.query.filter {
8,131✔
34
            collect_filters(self.schema, expression, &mut filters)?;
6,827✔
35
        }
1,304✔
36

37
        // Filter the sort options.
38
        // TODO: Handle duplicate fields.
39
        let mut order_by = vec![];
8,131✔
40
        for order in &self.query.order_by.0 {
13,510✔
41
            // Find the field index.
42
            let (field_index, _, _) =
5,379✔
43
                get_field_index_and_type(&order.field_name, &self.schema.fields)
5,379✔
44
                    .ok_or_else(|| PlanError::FieldNotFound(order.field_name.clone()))?;
5,379✔
45
            // If the field is already in a filter supported by `SortedInverted`, mark the corresponding filter.
46
            if seen_in_sorted_inverted_filter(field_index, order.direction, &mut filters)? {
5,379✔
47
                continue;
2,823✔
48
            }
2,556✔
49
            // This sort option needs to be in the plan.
2,556✔
50
            order_by.push((field_index, order.direction));
2,556✔
51
        }
52

53
        // If no filter and sort is requested, return a SeqScan.
54
        if filters.is_empty() && order_by.is_empty() {
8,131✔
55
            return Ok(Plan::SeqScan(SeqScan {
510✔
56
                direction: SortDirection::Ascending,
510✔
57
            }));
510✔
58
        }
7,621✔
59

7,621✔
60
        // If non-`Eq` filter is applied to `null` value, return empty result.
7,621✔
61
        if filters
7,621✔
62
            .iter()
7,621✔
63
            .any(|f| matches!(f.0.val, Field::Null) && f.0.op != Operator::EQ)
9,871✔
64
        {
65
            return Ok(Plan::ReturnEmpty);
3✔
66
        }
7,618✔
67

68
        // Find the range query, can be a range filter or a sort option.
69
        let range_query = find_range_query(&mut filters, &order_by)?;
7,618✔
70

71
        // Generate some index scans that can answer this query, lazily.
72
        let all_index_scans = helper::get_all_indexes(filters, range_query);
7,618✔
73

74
        // Check if existing secondary indexes can satisfy any of the scans.
75
        for index_scans in all_index_scans {
7,622✔
76
            if let Some(index_scans) = all_indexes_are_present(self.secondary_indexes, index_scans)
7,620✔
77
            {
78
                return Ok(Plan::IndexScans(index_scans));
7,616✔
79
            }
4✔
80
        }
81

82
        Err(PlanError::MatchingIndexNotFound)
2✔
83
    }
8,131✔
84
}
85

86
fn get_field_index_and_type(
15,251✔
87
    field_name: &str,
15,251✔
88
    fields: &[FieldDefinition],
15,251✔
89
) -> Option<(usize, FieldType, bool)> {
15,251✔
90
    fields
15,251✔
91
        .iter()
15,251✔
92
        .enumerate()
15,251✔
93
        .find(|(_, f)| f.name == field_name)
55,996✔
94
        .map(|(i, f)| (i, f.typ, f.nullable))
15,251✔
95
}
15,251✔
96

97
fn collect_filters(
11,905✔
98
    schema: &Schema,
11,905✔
99
    expression: &FilterExpression,
11,905✔
100
    filters: &mut Vec<(IndexFilter, Option<SortDirection>)>,
11,905✔
101
) -> Result<(), PlanError> {
11,905✔
102
    match expression {
11,905✔
103
        FilterExpression::Simple(field_name, operator, value) => {
9,872✔
104
            let (field_index, field_type, nullable) =
9,872✔
105
                get_field_index_and_type(field_name, &schema.fields)
9,872✔
106
                    .ok_or_else(|| PlanError::FieldNotFound(field_name.clone()))?;
9,872✔
107
            let field = json_value_to_field(value.clone(), field_type, nullable)?;
9,872✔
108
            filters.push((IndexFilter::new(field_index, *operator, field), None));
9,872✔
109
        }
110
        FilterExpression::And(expressions) => {
2,033✔
111
            for expression in expressions {
7,111✔
112
                collect_filters(schema, expression, filters)?;
5,078✔
113
            }
114
        }
115
    }
116
    Ok(())
11,905✔
117
}
11,905✔
118

119
fn seen_in_sorted_inverted_filter(
5,379✔
120
    field_index: usize,
5,379✔
121
    sort_direction: SortDirection,
5,379✔
122
    filters: &mut [(IndexFilter, Option<SortDirection>)],
5,379✔
123
) -> Result<bool, PlanError> {
5,379✔
124
    for filter in filters {
7,141✔
125
        if filter.0.field_index == field_index {
4,585✔
126
            return if !filter.0.op.supported_by_sorted_inverted() {
2,823✔
127
                Err(PlanError::CannotSortFullTextFilter)
×
128
            } else if let Some(direction) = filter.1 {
2,823✔
129
                if direction == sort_direction {
×
130
                    Ok(true)
×
131
                } else {
132
                    Err(PlanError::ConflictingSortOptions)
×
133
                }
134
            } else {
135
                filter.1 = Some(sort_direction);
2,823✔
136
                Ok(true)
2,823✔
137
            };
138
        }
1,762✔
139
    }
140

141
    Ok(false)
2,556✔
142
}
5,379✔
143

144
fn find_range_query(
7,617✔
145
    filters: &mut Vec<(IndexFilter, Option<SortDirection>)>,
7,617✔
146
    order_by: &[(usize, SortDirection)],
7,617✔
147
) -> Result<Option<RangeQuery>, PlanError> {
7,617✔
148
    let mut num_range_ops = 0;
7,617✔
149
    let mut range_filter_index = None;
7,617✔
150
    for (i, filter) in filters.iter().enumerate() {
9,869✔
151
        if filter.0.op.is_range_operator() {
9,869✔
152
            num_range_ops += 1;
4,069✔
153
            range_filter_index = Some(i);
4,069✔
154
        }
5,800✔
155
    }
156
    num_range_ops += order_by.len();
7,618✔
157
    if num_range_ops > 1 {
7,618✔
158
        return Err(PlanError::RangeQueryLimit);
×
159
    }
7,618✔
160
    Ok(if let Some(range_filter_index) = range_filter_index {
7,618✔
161
        let filter = filters.remove(range_filter_index);
4,069✔
162
        Some(RangeQuery::new(
4,069✔
163
            filter.0.field_index,
4,069✔
164
            RangeQueryKind::Filter {
4,069✔
165
                operator: filter.0.op,
4,069✔
166
                value: filter.0.val,
4,069✔
167
                sort_direction: filter.1,
4,069✔
168
            },
4,069✔
169
        ))
4,069✔
170
    } else if let Some((field_index, sort_direction)) = order_by.first() {
3,549✔
171
        Some(RangeQuery::new(
2,556✔
172
            *field_index,
2,556✔
173
            RangeQueryKind::OrderBy {
2,556✔
174
                sort_direction: *sort_direction,
2,556✔
175
            },
2,556✔
176
        ))
2,556✔
177
    } else {
178
        None
993✔
179
    })
180
}
7,618✔
181

182
impl IndexScanKind {
183
    fn is_supported_by_index(&self, index: &IndexDefinition) -> bool {
184
        match (self, index) {
32,587✔
185
            (
186
                IndexScanKind::SortedInverted {
187
                    eq_filters,
32,177✔
188
                    range_query,
32,177✔
189
                },
32,177✔
190
                IndexDefinition::SortedInverted(fields),
32,177✔
191
            ) => {
32,177✔
192
                if fields.len() < eq_filters.len() {
32,177✔
193
                    return false;
4,076✔
194
                }
28,101✔
195
                if !eq_filters
28,101✔
196
                    .iter()
28,101✔
197
                    .zip(fields)
28,101✔
198
                    .all(|(filter, field)| filter.0 == *field)
28,101✔
199
                {
200
                    return false;
13,353✔
201
                }
14,748✔
202
                if let Some(range_query) = range_query {
14,748✔
203
                    if fields.len() != eq_filters.len() + 1 {
13,802✔
204
                        return false;
2,776✔
205
                    }
11,026✔
206
                    let last_field = fields
11,026✔
207
                        .last()
11,026✔
208
                        .expect("We've checked `fields.len()` is at least 1");
11,026✔
209
                    range_query.field_index == *last_field
11,026✔
210
                } else {
211
                    fields.len() == eq_filters.len()
946✔
212
                }
213
            }
214
            (IndexScanKind::FullText { filter }, IndexDefinition::FullText(field_index)) => {
54✔
215
                filter.field_index == *field_index
54✔
216
            }
217
            _ => false,
356✔
218
        }
219
    }
32,587✔
220
}
221

222
fn all_indexes_are_present(
7,620✔
223
    indexes: &[IndexDefinition],
7,620✔
224
    index_scan_kinds: Vec<IndexScanKind>,
7,620✔
225
) -> Option<Vec<IndexScan>> {
7,620✔
226
    let mut scans = vec![];
7,620✔
227
    for index_scan_kind in index_scan_kinds {
15,238✔
228
        let found = indexes
7,622✔
229
            .iter()
7,622✔
230
            .enumerate()
7,622✔
231
            .find(|(_, i)| index_scan_kind.is_supported_by_index(i));
32,573✔
232

7,622✔
233
        match found {
7,622✔
234
            Some((idx, _)) => {
7,618✔
235
                scans.push(IndexScan {
7,618✔
236
                    index_id: idx,
7,618✔
237
                    kind: index_scan_kind,
7,618✔
238
                    is_single_field_sorted_inverted: is_single_field_sorted_inverted(&indexes[idx]),
7,618✔
239
                });
7,618✔
240
            }
7,618✔
241
            None => return None,
4✔
242
        }
243
    }
244
    Some(scans)
7,616✔
245
}
7,620✔
246

247
fn is_single_field_sorted_inverted(index: &IndexDefinition) -> bool {
7,618✔
248
    match index {
7,618✔
249
        // `fields.len() == 1` criteria must be kept the same with `comparator.rs`.
250
        IndexDefinition::SortedInverted(fields) => fields.len() == 1,
7,568✔
251
        _ => false,
50✔
252
    }
253
}
7,618✔
254

255
#[cfg(test)]
256
mod tests {
257
    use crate::cache::plan::SortedInvertedRangeQuery;
258

259
    use super::*;
260

261
    #[test]
1✔
262
    fn test_is_supported_by_index() {
1✔
263
        let check_sorted_inverted =
1✔
264
            |eq_filters: Vec<usize>, range_query: Option<usize>, index, expected: bool| {
10✔
265
                assert_eq!(
10✔
266
                    IndexScanKind::SortedInverted {
10✔
267
                        eq_filters: eq_filters
10✔
268
                            .into_iter()
10✔
269
                            .map(|index| (index, Field::Null))
11✔
270
                            .collect(),
10✔
271
                        range_query: range_query.map(|index| SortedInvertedRangeQuery {
10✔
272
                            field_index: index,
5✔
273
                            sort_direction: SortDirection::Ascending,
5✔
274
                            operator_and_value: None,
5✔
275
                        })
10✔
276
                    }
10✔
277
                    .is_supported_by_index(&IndexDefinition::SortedInverted(index)),
10✔
278
                    expected
10✔
279
                );
10✔
280
            };
10✔
281

282
        check_sorted_inverted(vec![0], None, vec![0], true);
1✔
283
        check_sorted_inverted(vec![0], None, vec![1], false);
1✔
284
        check_sorted_inverted(vec![0], None, vec![0, 1], false);
1✔
285
        check_sorted_inverted(vec![0, 1], None, vec![0], false);
1✔
286
        check_sorted_inverted(vec![0, 1], None, vec![0, 1], true);
1✔
287
        check_sorted_inverted(vec![], Some(0), vec![0], true);
1✔
288
        check_sorted_inverted(vec![0], Some(1), vec![0, 1], true);
1✔
289
        check_sorted_inverted(vec![0], Some(1), vec![0, 1, 2], false);
1✔
290
        check_sorted_inverted(vec![0], Some(1), vec![0, 2], false);
1✔
291
        check_sorted_inverted(vec![0], Some(1), vec![0], false);
1✔
292

1✔
293
        let full_text_scan = IndexScanKind::FullText {
1✔
294
            filter: IndexFilter {
1✔
295
                field_index: 0,
1✔
296
                op: Operator::Contains,
1✔
297
                val: Field::Null,
1✔
298
            },
1✔
299
        };
1✔
300
        assert!(full_text_scan.is_supported_by_index(&IndexDefinition::FullText(0)),);
1✔
301
        assert!(!full_text_scan.is_supported_by_index(&IndexDefinition::FullText(1)));
1✔
302

303
        assert!(!full_text_scan.is_supported_by_index(&IndexDefinition::SortedInverted(vec![0])),);
1✔
304
        assert!(!IndexScanKind::SortedInverted {
1✔
305
            eq_filters: vec![(0, Field::Null)],
1✔
306
            range_query: None
1✔
307
        }
1✔
308
        .is_supported_by_index(&IndexDefinition::FullText(0)),);
1✔
309
    }
1✔
310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc