• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/vortex-layout/src/layouts/struct_/reader.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::collections::BTreeSet;
5
use std::ops::Range;
6
use std::sync::Arc;
7

8
use dashmap::DashMap;
9
use itertools::Itertools;
10
use vortex_array::stats::Precision;
11
use vortex_dtype::{DType, FieldMask, FieldName, StructFields};
12
use vortex_error::{VortexExpect, VortexResult, vortex_err};
13
use vortex_expr::transform::immediate_access::annotate_scope_access;
14
use vortex_expr::transform::partition::{PartitionedExpr, partition};
15
use vortex_expr::transform::replace::{replace, replace_root_fields};
16
use vortex_expr::transform::simplify_typed::simplify_typed;
17
use vortex_expr::{ExactExpr, ExprRef, col, root};
18
use vortex_utils::aliases::hash_map::HashMap;
19

20
use crate::layouts::partitioned::{PartitionedArrayEvaluation, PartitionedMaskEvaluation};
21
use crate::layouts::struct_::StructLayout;
22
use crate::segments::SegmentSource;
23
use crate::{
24
    ArrayEvaluation, LayoutReader, LayoutReaderRef, LazyReaderChildren, MaskEvaluation,
25
    NoOpPruningEvaluation, PruningEvaluation,
26
};
27

28
pub struct StructReader {
29
    layout: StructLayout,
30
    name: Arc<str>,
31
    lazy_children: LazyReaderChildren,
32

33
    /// A `pack` expression that holds each individual field of the root DType. This expansion
34
    /// ensures we can correctly partition expressions over the fields of the struct.
35
    expanded_root_expr: ExprRef,
36

37
    field_lookup: Option<HashMap<FieldName, usize>>,
38
    partitioned_expr_cache: DashMap<ExactExpr, Partitioned>,
39
}
40

41
impl StructReader {
42
    pub(super) fn try_new(
996✔
43
        layout: StructLayout,
996✔
44
        name: Arc<str>,
996✔
45
        segment_source: Arc<dyn SegmentSource>,
996✔
46
    ) -> VortexResult<Self> {
996✔
47
        let struct_dt = layout.struct_fields();
996✔
48

49
        // NOTE: This number is arbitrary and likely depends on the longest common prefix of field names
50
        let field_lookup = (struct_dt.nfields() > 80).then(|| {
996✔
51
            struct_dt
×
52
                .names()
×
53
                .iter()
×
54
                .enumerate()
×
55
                .map(|(i, n)| (n.clone(), i))
×
56
                .collect()
×
UNCOV
57
        });
×
58

59
        let lazy_children =
996✔
60
            LazyReaderChildren::new(layout.children.clone(), segment_source.clone());
996✔
61

62
        // Create an expanded root expression that contains all fields of the struct.
63
        let expanded_root_expr = replace_root_fields(root(), struct_dt);
996✔
64

65
        // This is where we need to do some complex things with the scan in order to split it into
66
        // different scans for different fields.
67
        Ok(Self {
996✔
68
            layout,
996✔
69
            name,
996✔
70
            expanded_root_expr,
996✔
71
            lazy_children,
996✔
72
            field_lookup,
996✔
73
            partitioned_expr_cache: Default::default(),
996✔
74
        })
996✔
75
    }
996✔
76

77
    /// Return the [`StructFields`] of this layout.
78
    fn struct_fields(&self) -> &StructFields {
36,652✔
79
        self.layout.struct_fields()
36,652✔
80
    }
36,652✔
81

82
    /// Return the child reader for the field.
83
    fn child(&self, name: &FieldName) -> VortexResult<&LayoutReaderRef> {
10,076✔
84
        let idx = self
10,076✔
85
            .field_lookup
10,076✔
86
            .as_ref()
10,076✔
87
            .and_then(|lookup| lookup.get(name).copied())
10,076✔
88
            .or_else(|| self.struct_fields().find(name))
10,076✔
89
            .ok_or_else(|| vortex_err!("Field {} not found in struct layout", name))?;
10,076✔
90
        self.child_by_idx(idx)
10,076✔
91
    }
10,076✔
92

93
    /// Return the child reader for the field, by index.
94
    fn child_by_idx(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
13,288✔
95
        let field_dtype = self
13,288✔
96
            .struct_fields()
13,288✔
97
            .field_by_index(idx)
13,288✔
98
            .ok_or_else(|| vortex_err!("Missing field {idx}"))?;
13,288✔
99
        let name = &self.struct_fields().names()[idx];
13,288✔
100
        self.lazy_children
13,288✔
101
            .get(idx, &field_dtype, &format!("{}.{}", self.name, name).into())
13,288✔
102
    }
13,288✔
103

104
    /// Utility for partitioning an expression over the fields of a struct.
105
    fn partition_expr(&self, expr: ExprRef) -> Partitioned {
6,716✔
106
        self.partitioned_expr_cache
6,716✔
107
            .entry(ExactExpr(expr.clone()))
6,716✔
108
            .or_insert_with(|| {
6,716✔
109
                // First, we expand the root scope into the fields of the struct to ensure
110
                // that partitioning works correctly.
111
                let expr = replace(expr.clone(), &root(), self.expanded_root_expr.clone());
1,898✔
112
                let expr = simplify_typed(expr, self.dtype())
1,898✔
113
                    .vortex_expect("We should not fail to simplify expression over struct fields");
1,898✔
114

115
                // Partition the expression into expressions that can be evaluated over individual fields
116
                let mut partitioned = partition(
1,898✔
117
                    expr.clone(),
1,898✔
118
                    self.dtype(),
1,898✔
119
                    annotate_scope_access(
1,898✔
120
                        self.dtype()
1,898✔
121
                            .as_struct()
1,898✔
122
                            .vortex_expect("We know it's a struct DType"),
1,898✔
123
                    ),
124
                )
125
                .vortex_expect("We should not fail to partition expression over struct fields");
1,898✔
126

127
                if partitioned.partitions.len() == 1 {
1,898✔
128
                    // If there's only one partition, we step into the field scope of the original
129
                    // expression by replacing any `$.a` with `$`.
130
                    return Partitioned::Single(
1,030✔
131
                        partitioned.partition_names[0].clone(),
1,030✔
132
                        replace(
1,030✔
133
                            expr.clone(),
1,030✔
134
                            &col(partitioned.partition_names[0].clone()),
1,030✔
135
                            root(),
1,030✔
136
                        ),
1,030✔
137
                    );
1,030✔
138
                }
868✔
139

140
                // We now need to process the partitioned expressions to rewrite the root scope
141
                // to be that of the field, rather than the struct. In other words, "stepping in"
142
                // to the field scope.
143
                partitioned.partitions = partitioned
868✔
144
                    .partitions
868✔
145
                    .iter()
868✔
146
                    .zip_eq(partitioned.partition_names.iter())
868✔
147
                    .map(|(e, name)| replace(e.clone(), &col(name.clone()), root()))
2,468✔
148
                    .collect();
868✔
149

150
                Partitioned::Multi(Arc::new(partitioned))
868✔
151
            })
1,898✔
152
            .clone()
6,716✔
153
    }
6,716✔
154
}
155

156
/// When partitioning an expression, in the case it only has a single partition we can avoid
157
/// some cost and just delegate to the child reader directly.
158
#[derive(Clone)]
159
enum Partitioned {
160
    Single(FieldName, ExprRef),
161
    Multi(Arc<PartitionedExpr<FieldName>>),
162
}
163

164
impl LayoutReader for StructReader {
165
    fn name(&self) -> &Arc<str> {
1,284✔
166
        &self.name
1,284✔
167
    }
1,284✔
168

169
    fn dtype(&self) -> &DType {
12,596✔
170
        self.layout.dtype()
12,596✔
171
    }
12,596✔
172

173
    fn row_count(&self) -> Precision<u64> {
×
174
        Precision::Exact(self.layout.row_count())
×
175
    }
×
176

177
    fn register_splits(
1,284✔
178
        &self,
1,284✔
179
        field_mask: &[FieldMask],
1,284✔
180
        row_offset: u64,
1,284✔
181
        splits: &mut BTreeSet<u64>,
1,284✔
182
    ) -> VortexResult<()> {
1,284✔
183
        // In the case of an empty struct, we need to register the end split.
184
        splits.insert(row_offset + self.layout.row_count);
1,284✔
185

186
        self.layout.matching_fields(field_mask, |mask, idx| {
3,212✔
187
            self.child_by_idx(idx)?
3,212✔
188
                .register_splits(&[mask], row_offset, splits)
3,212✔
189
        })
3,212✔
190
    }
1,284✔
191

192
    fn pruning_evaluation(
2,350✔
193
        &self,
2,350✔
194
        row_range: &Range<u64>,
2,350✔
195
        expr: &ExprRef,
2,350✔
196
    ) -> VortexResult<Box<dyn PruningEvaluation>> {
2,350✔
197
        // Partition the expression into expressions that can be evaluated over individual fields
198
        match &self.partition_expr(expr.clone()) {
2,350✔
199
            Partitioned::Single(name, partition) => {
2,126✔
200
                self.child(name)?.pruning_evaluation(row_range, partition)
2,126✔
201
            }
202
            Partitioned::Multi(_) => {
203
                // TODO(ngates): if all partitions are boolean, we can use a pruning evaluation. Otherwise
204
                //  there's not much we can do? Maybe... it's complicated...
205
                Ok(Box::new(NoOpPruningEvaluation))
224✔
206
            }
207
        }
208
    }
2,350✔
209

210
    fn filter_evaluation(
2,351✔
211
        &self,
2,351✔
212
        row_range: &Range<u64>,
2,351✔
213
        expr: &ExprRef,
2,351✔
214
    ) -> VortexResult<Box<dyn MaskEvaluation>> {
2,351✔
215
        // Partition the expression into expressions that can be evaluated over individual fields
216
        match &self.partition_expr(expr.clone()) {
2,351✔
217
            Partitioned::Single(name, partition) => {
2,126✔
218
                self.child(name)?.filter_evaluation(row_range, partition)
2,126✔
219
            }
220
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedMaskEvaluation::try_new(
225✔
221
                partitioned.clone(),
225✔
UNCOV
222
                |name, expr| self.child(name)?.filter_evaluation(row_range, expr),
×
223
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
450✔
UNCOV
224
            )?)),
×
225
        }
226
    }
2,351✔
227

228
    fn projection_evaluation(
2,015✔
229
        &self,
2,015✔
230
        row_range: &Range<u64>,
2,015✔
231
        expr: &ExprRef,
2,015✔
232
    ) -> VortexResult<Box<dyn ArrayEvaluation>> {
2,015✔
233
        // Partition the expression into expressions that can be evaluated over individual fields
234
        match &self.partition_expr(expr.clone()) {
2,015✔
235
            Partitioned::Single(name, partition) => self
432✔
236
                .child(name)?
432✔
237
                .projection_evaluation(row_range, partition),
432✔
238
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedArrayEvaluation::try_new(
1,583✔
239
                partitioned.clone(),
1,583✔
240
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
4,942✔
UNCOV
241
            )?)),
×
242
        }
243
    }
2,015✔
244
}
245

246
#[cfg(test)]
247
mod tests {
248
    use std::sync::Arc;
249

250
    use arcref::ArcRef;
251
    use futures::executor::block_on;
252
    use futures::stream;
253
    use itertools::Itertools;
254
    use rstest::{fixture, rstest};
255
    use vortex_array::arrays::StructArray;
256
    use vortex_array::{Array, ArrayContext, IntoArray, ToCanonical};
257
    use vortex_buffer::buffer;
258
    use vortex_dtype::Nullability::NonNullable;
259
    use vortex_dtype::PType::I32;
260
    use vortex_dtype::{DType, StructFields};
261
    use vortex_expr::{eq, get_item, get_item_scope, gt, lit, or, pack, root};
262
    use vortex_mask::Mask;
263

264
    use crate::layouts::flat::writer::FlatLayoutStrategy;
265
    use crate::layouts::struct_::writer::StructStrategy;
266
    use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
267
    use crate::sequence::SequenceId;
268
    use crate::{LayoutRef, LayoutStrategy, SequentialStreamAdapter, SequentialStreamExt as _};
269

270
    #[fixture]
271
    /// Create a chunked layout with three chunks of primitive arrays.
272
    fn struct_layout() -> (Arc<dyn SegmentSource>, LayoutRef) {
273
        let ctx = ArrayContext::empty();
274
        let segments = TestSegments::default();
275
        let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
276
        let strategy =
277
            StructStrategy::new(ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())));
278
        let layout = block_on(
279
            strategy.write_stream(
280
                &ctx,
281
                sequence_writer,
282
                SequentialStreamAdapter::new(
283
                    DType::Struct(
284
                        StructFields::new(
285
                            vec!["a".into(), "b".into(), "c".into()].into(),
286
                            vec![I32.into(), I32.into(), I32.into()],
287
                        ),
288
                        NonNullable,
289
                    ),
290
                    stream::once(async {
4✔
291
                        Ok((
4✔
292
                            SequenceId::root().downgrade(),
4✔
293
                            StructArray::from_fields(
4✔
294
                                [
4✔
295
                                    ("a", buffer![7, 2, 3].into_array()),
4✔
296
                                    ("b", buffer![4, 5, 6].into_array()),
4✔
297
                                    ("c", buffer![4, 5, 6].into_array()),
4✔
298
                                ]
4✔
299
                                .as_slice(),
4✔
300
                            )
4✔
301
                            .unwrap()
4✔
302
                            .into_array(),
4✔
303
                        ))
4✔
304
                    }),
4✔
305
                )
306
                .sendable(),
307
            ),
308
        )
309
        .unwrap();
310

311
        (Arc::new(segments), layout)
312
    }
313

314
    #[rstest]
315
    fn test_struct_layout_or(
316
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
317
    ) {
318
        let reader = layout.new_reader("".into(), segments).unwrap();
319
        let filt = or(
320
            eq(get_item_scope("a"), lit(7)),
321
            or(
322
                eq(get_item_scope("b"), lit(5)),
323
                eq(get_item_scope("a"), lit(3)),
324
            ),
325
        );
326
        let result = block_on(
327
            reader
328
                .filter_evaluation(&(0..3), &filt)
329
                .unwrap()
330
                .invoke(Mask::new_true(3)),
331
        )
332
        .unwrap();
333
        assert_eq!(
334
            vec![true, true, true],
335
            result.to_boolean_buffer().iter().collect_vec()
336
        );
337
    }
338

339
    #[rstest]
340
    fn test_struct_layout(
341
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
342
    ) {
343
        let reader = layout.new_reader("".into(), segments).unwrap();
344
        let expr = gt(get_item("a", root()), get_item("b", root()));
345
        let result = block_on(
346
            reader
347
                .projection_evaluation(&(0..3), &expr)
348
                .unwrap()
349
                .invoke(Mask::new_true(3)),
350
        )
351
        .unwrap();
352
        assert_eq!(
353
            vec![true, false, false],
354
            result
355
                .to_bool()
356
                .unwrap()
357
                .boolean_buffer()
358
                .iter()
359
                .collect::<Vec<_>>()
360
        );
361
    }
362

363
    #[rstest]
364
    fn test_struct_layout_row_mask(
365
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
366
    ) {
367
        let reader = layout.new_reader("".into(), segments).unwrap();
368
        let expr = gt(get_item("a", root()), get_item("b", root()));
369
        let result = block_on(
370
            reader
371
                .projection_evaluation(&(0..3), &expr)
372
                .unwrap()
373
                .invoke(Mask::from_iter([true, true, false])),
374
        )
375
        .unwrap();
376

377
        assert_eq!(result.len(), 2);
378

379
        assert_eq!(
380
            vec![true, false],
381
            result
382
                .to_bool()
383
                .unwrap()
384
                .boolean_buffer()
385
                .iter()
386
                .collect::<Vec<_>>()
387
        );
388
    }
389

390
    #[rstest]
391
    fn test_struct_layout_select(
392
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
393
    ) {
394
        let reader = layout.new_reader("".into(), segments).unwrap();
395
        let expr = pack(
396
            [("a", get_item("a", root())), ("b", get_item("b", root()))],
397
            NonNullable,
398
        );
399
        let result = block_on(
400
            reader
401
                .projection_evaluation(&(0..3), &expr)
402
                .unwrap()
403
                // Take rows 0 and 1, skip row 2, and anything after that
404
                .invoke(Mask::from_iter([true, true, false])),
405
        )
406
        .unwrap();
407

408
        assert_eq!(result.len(), 2);
409

410
        assert_eq!(
411
            result
412
                .to_struct()
413
                .unwrap()
414
                .field_by_name("a")
415
                .unwrap()
416
                .to_primitive()
417
                .unwrap()
418
                .as_slice::<i32>(),
419
            [7, 2].as_slice()
420
        );
421

422
        assert_eq!(
423
            result
424
                .to_struct()
425
                .unwrap()
426
                .field_by_name("b")
427
                .unwrap()
428
                .to_primitive()
429
                .unwrap()
430
                .as_slice::<i32>(),
431
            [4, 5].as_slice()
432
        );
433
    }
434
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc