• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16448958948

22 Jul 2025 03:37PM UTC coverage: 81.011% (-0.1%) from 81.109%
16448958948

Pull #3876

github

web-flow
Merge b0e97510f into db33b9fe9
Pull Request #3876: feat[layout]: replace register_splits with a layout splits stream

466 of 572 new or added lines in 17 files covered. (81.47%)

48 existing lines in 4 files now uncovered.

42258 of 52163 relevant lines covered (81.01%)

169401.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.57
/vortex-layout/src/layouts/struct_/reader.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::ops::Range;
5
use std::sync::Arc;
6

7
use arrow_buffer::ArrowNativeType;
8
use dashmap::DashMap;
9
use itertools::Itertools;
10
use vortex_array::stats::Precision;
11
use vortex_dtype::{DType, FieldMask, FieldName, StructFields};
12
use vortex_error::{VortexExpect, VortexResult, vortex_err};
13
use vortex_expr::transform::immediate_access::annotate_scope_access;
14
use vortex_expr::transform::partition::{PartitionedExpr, partition};
15
use vortex_expr::transform::replace::{replace, replace_root_fields};
16
use vortex_expr::transform::simplify_typed::simplify_typed;
17
use vortex_expr::{ExactExpr, ExprRef, col, root};
18
use vortex_mask::Mask;
19
use vortex_utils::aliases::hash_map::HashMap;
20

21
use crate::layouts::partitioned::{PartitionedArrayEvaluation, PartitionedMaskEvaluation};
22
use crate::layouts::struct_::StructLayout;
23
use crate::masks::{BoxMaskIterator, IntersectionMaskIterator};
24
use crate::row_selection::RowSelectionRef;
25
use crate::segments::SegmentSource;
26
use crate::{
27
    ArrayEvaluation, LayoutReader, LayoutReaderRef, LazyReaderChildren, MaskEvaluation,
28
    NoOpPruningEvaluation, PruningEvaluation,
29
};
30

31
pub struct StructReader {
32
    layout: StructLayout,
33
    name: Arc<str>,
34
    lazy_children: LazyReaderChildren,
35

36
    /// A `pack` expression that holds each individual field of the root DType. This expansion
37
    /// ensures we can correctly partition expressions over the fields of the struct.
38
    expanded_root_expr: ExprRef,
39

40
    field_lookup: Option<HashMap<FieldName, usize>>,
41
    partitioned_expr_cache: DashMap<ExactExpr, Partitioned>,
42
}
43

44
impl StructReader {
45
    pub(super) fn try_new(
1,051✔
46
        layout: StructLayout,
1,051✔
47
        name: Arc<str>,
1,051✔
48
        segment_source: Arc<dyn SegmentSource>,
1,051✔
49
    ) -> VortexResult<Self> {
1,051✔
50
        let struct_dt = layout.struct_fields();
1,051✔
51

52
        // NOTE: This number is arbitrary and likely depends on the longest common prefix of field names
53
        let field_lookup = (struct_dt.nfields() > 80).then(|| {
1,051✔
54
            struct_dt
×
55
                .names()
×
56
                .iter()
×
57
                .enumerate()
×
58
                .map(|(i, n)| (n.clone(), i))
×
59
                .collect()
×
60
        });
×
61

62
        let lazy_children =
1,051✔
63
            LazyReaderChildren::new(layout.children.clone(), segment_source.clone());
1,051✔
64

65
        // Create an expanded root expression that contains all fields of the struct.
66
        let expanded_root_expr = replace_root_fields(root(), struct_dt);
1,051✔
67

68
        // This is where we need to do some complex things with the scan in order to split it into
69
        // different scans for different fields.
70
        Ok(Self {
1,051✔
71
            layout,
1,051✔
72
            name,
1,051✔
73
            expanded_root_expr,
1,051✔
74
            lazy_children,
1,051✔
75
            field_lookup,
1,051✔
76
            partitioned_expr_cache: Default::default(),
1,051✔
77
        })
1,051✔
78
    }
1,051✔
79

80
    /// Return the [`StructFields`] of this layout.
81
    fn struct_fields(&self) -> &StructFields {
38,891✔
82
        self.layout.struct_fields()
38,891✔
83
    }
38,891✔
84

85
    /// Return the child reader for the field.
86
    fn child(&self, name: &FieldName) -> VortexResult<&LayoutReaderRef> {
10,787✔
87
        let idx = self
10,787✔
88
            .field_lookup
10,787✔
89
            .as_ref()
10,787✔
90
            .and_then(|lookup| lookup.get(name).copied())
10,787✔
91
            .or_else(|| self.struct_fields().find(name))
10,787✔
92
            .ok_or_else(|| vortex_err!("Field {} not found in struct layout", name))?;
10,787✔
93
        self.child_by_idx(idx)
10,787✔
94
    }
10,787✔
95

96
    /// Return the child reader for the field, by index.
97
    fn child_by_idx(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
14,052✔
98
        let field_dtype = self
14,052✔
99
            .struct_fields()
14,052✔
100
            .field_by_index(idx)
14,052✔
101
            .ok_or_else(|| vortex_err!("Missing field {idx}"))?;
14,052✔
102
        let name = &self.struct_fields().names()[idx];
14,052✔
103
        self.lazy_children
14,052✔
104
            .get(idx, &field_dtype, &format!("{}.{}", self.name, name).into())
14,052✔
105
    }
14,052✔
106

107
    /// Utility for partitioning an expression over the fields of a struct.
108
    fn partition_expr(&self, expr: ExprRef) -> Partitioned {
7,410✔
109
        self.partitioned_expr_cache
7,410✔
110
            .entry(ExactExpr(expr.clone()))
7,410✔
111
            .or_insert_with(|| {
7,410✔
112
                // First, we expand the root scope into the fields of the struct to ensure
113
                // that partitioning works correctly.
114
                let expr = replace(expr.clone(), &root(), self.expanded_root_expr.clone());
2,016✔
115
                let expr = simplify_typed(expr, self.dtype())
2,016✔
116
                    .vortex_expect("We should not fail to simplify expression over struct fields");
2,016✔
117

118
                // Partition the expression into expressions that can be evaluated over individual fields
119
                let mut partitioned = partition(
2,016✔
120
                    expr.clone(),
2,016✔
121
                    self.dtype(),
2,016✔
122
                    annotate_scope_access(
2,016✔
123
                        self.dtype()
2,016✔
124
                            .as_struct()
2,016✔
125
                            .vortex_expect("We know it's a struct DType"),
2,016✔
126
                    ),
127
                )
128
                .vortex_expect("We should not fail to partition expression over struct fields");
2,016✔
129

130
                if partitioned.partitions.len() == 1 {
2,016✔
131
                    // If there's only one partition, we step into the field scope of the original
132
                    // expression by replacing any `$.a` with `$`.
133
                    return Partitioned::Single(
1,133✔
134
                        partitioned.partition_names[0].clone(),
1,133✔
135
                        replace(
1,133✔
136
                            expr.clone(),
1,133✔
137
                            &col(partitioned.partition_names[0].clone()),
1,133✔
138
                            root(),
1,133✔
139
                        ),
1,133✔
140
                    );
1,133✔
141
                }
883✔
142

143
                // We now need to process the partitioned expressions to rewrite the root scope
144
                // to be that of the field, rather than the struct. In other words, "stepping in"
145
                // to the field scope.
146
                partitioned.partitions = partitioned
883✔
147
                    .partitions
883✔
148
                    .iter()
883✔
149
                    .zip_eq(partitioned.partition_names.iter())
883✔
150
                    .map(|(e, name)| replace(e.clone(), &col(name.clone()), root()))
2,503✔
151
                    .collect();
883✔
152

153
                Partitioned::Multi(Arc::new(partitioned))
883✔
154
            })
2,016✔
155
            .clone()
7,410✔
156
    }
7,410✔
157
}
158

159
/// When partitioning an expression, in the case it only has a single partition we can avoid
160
/// some cost and just delegate to the child reader directly.
161
#[derive(Clone)]
162
enum Partitioned {
163
    Single(FieldName, ExprRef),
164
    Multi(Arc<PartitionedExpr<FieldName>>),
165
}
166

167
impl LayoutReader for StructReader {
168
    fn name(&self) -> &Arc<str> {
1,319✔
169
        &self.name
1,319✔
170
    }
1,319✔
171

172
    fn dtype(&self) -> &DType {
12,273✔
173
        self.layout.dtype()
12,273✔
174
    }
12,273✔
175

176
    fn row_count(&self) -> Precision<u64> {
×
177
        Precision::Exact(self.layout.row_count())
×
178
    }
×
179

180
    fn row_masks(&self, selection: &RowSelectionRef, field_mask: &[FieldMask]) -> BoxMaskIterator {
1,319✔
181
        // Here we construct a stream of masks for each field in the field_mask, and then take
182
        // the smallest mask from each field.
183
        // If the field_mask is empty, we return an iterator of all true masks.
184
        if field_mask.is_empty() {
1,319✔
NEW
185
            let row_count = self.layout.row_count().as_usize();
×
NEW
186
            return Box::new(std::iter::once(Ok(Mask::AllTrue(row_count))));
×
187
        }
1,319✔
188

189
        let mut field_iterators = Vec::with_capacity(field_mask.len());
1,319✔
190
        self.layout
1,319✔
191
            .matching_fields(field_mask, |mask, idx| {
3,265✔
192
                let child = self.child_by_idx(idx)?;
3,265✔
193
                field_iterators.push(child.row_masks(selection, &[mask]));
3,265✔
194
                Ok(())
3,265✔
195
            })
3,265✔
196
            .vortex_expect("infallible");
1,319✔
197

198
        // FIXME(ngates): if there are no fields, we need an iterator of masks that covers row_count.
199
        // Now we can use the iterator directly without conversion to streams
200
        Box::new(IntersectionMaskIterator::new(field_iterators))
1,319✔
201
    }
1,319✔
202

203
    fn pruning_evaluation(
2,682✔
204
        &self,
2,682✔
205
        row_range: &Range<u64>,
2,682✔
206
        expr: &ExprRef,
2,682✔
207
    ) -> VortexResult<Box<dyn PruningEvaluation>> {
2,682✔
208
        // Partition the expression into expressions that can be evaluated over individual fields
209
        match &self.partition_expr(expr.clone()) {
2,682✔
210
            Partitioned::Single(name, partition) => {
2,455✔
211
                self.child(name)?.pruning_evaluation(row_range, partition)
2,455✔
212
            }
213
            Partitioned::Multi(_) => {
214
                // TODO(ngates): if all partitions are boolean, we can use a pruning evaluation. Otherwise
215
                //  there's not much we can do? Maybe... it's complicated...
216
                Ok(Box::new(NoOpPruningEvaluation))
227✔
217
            }
218
        }
219
    }
2,682✔
220

221
    fn filter_evaluation(
2,683✔
222
        &self,
2,683✔
223
        row_range: &Range<u64>,
2,683✔
224
        expr: &ExprRef,
2,683✔
225
    ) -> VortexResult<Box<dyn MaskEvaluation>> {
2,683✔
226
        // Partition the expression into expressions that can be evaluated over individual fields
227
        match &self.partition_expr(expr.clone()) {
2,683✔
228
            Partitioned::Single(name, partition) => {
2,455✔
229
                self.child(name)?.filter_evaluation(row_range, partition)
2,455✔
230
            }
231
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedMaskEvaluation::try_new(
228✔
232
                partitioned.clone(),
228✔
233
                |name, expr| self.child(name)?.filter_evaluation(row_range, expr),
×
234
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
456✔
235
            )?)),
×
236
        }
237
    }
2,683✔
238

239
    fn projection_evaluation(
2,045✔
240
        &self,
2,045✔
241
        row_range: &Range<u64>,
2,045✔
242
        expr: &ExprRef,
2,045✔
243
    ) -> VortexResult<Box<dyn ArrayEvaluation>> {
2,045✔
244
        // Partition the expression into expressions that can be evaluated over individual fields
245
        match &self.partition_expr(expr.clone()) {
2,045✔
246
            Partitioned::Single(name, partition) => self
450✔
247
                .child(name)?
450✔
248
                .projection_evaluation(row_range, partition),
450✔
249
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedArrayEvaluation::try_new(
1,595✔
250
                partitioned.clone(),
1,595✔
251
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
4,971✔
252
            )?)),
×
253
        }
254
    }
2,045✔
255
}
256

257
#[cfg(test)]
258
mod tests {
259
    use std::sync::Arc;
260

261
    use arcref::ArcRef;
262
    use futures::executor::block_on;
263
    use futures::stream;
264
    use itertools::Itertools;
265
    use rstest::{fixture, rstest};
266
    use vortex_array::arrays::StructArray;
267
    use vortex_array::{Array, ArrayContext, IntoArray, ToCanonical};
268
    use vortex_buffer::buffer;
269
    use vortex_dtype::Nullability::NonNullable;
270
    use vortex_dtype::PType::I32;
271
    use vortex_dtype::{DType, StructFields};
272
    use vortex_expr::{col, eq, get_item, gt, lit, or, pack, root};
273
    use vortex_mask::Mask;
274

275
    use crate::layouts::flat::writer::FlatLayoutStrategy;
276
    use crate::layouts::struct_::writer::StructStrategy;
277
    use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
278
    use crate::sequence::SequenceId;
279
    use crate::{LayoutRef, LayoutStrategy, SequentialStreamAdapter, SequentialStreamExt as _};
280

281
    #[fixture]
282
    /// Create a chunked layout with three chunks of primitive arrays.
283
    fn struct_layout() -> (Arc<dyn SegmentSource>, LayoutRef) {
284
        let ctx = ArrayContext::empty();
285
        let segments = TestSegments::default();
286
        let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
287
        let strategy =
288
            StructStrategy::new(ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())));
289
        let layout = block_on(
290
            strategy.write_stream(
291
                &ctx,
292
                sequence_writer,
293
                SequentialStreamAdapter::new(
294
                    DType::Struct(
295
                        StructFields::new(
296
                            vec!["a".into(), "b".into(), "c".into()].into(),
297
                            vec![I32.into(), I32.into(), I32.into()],
298
                        ),
299
                        NonNullable,
300
                    ),
301
                    stream::once(async {
4✔
302
                        Ok((
4✔
303
                            SequenceId::root().downgrade(),
4✔
304
                            StructArray::from_fields(
4✔
305
                                [
4✔
306
                                    ("a", buffer![7, 2, 3].into_array()),
4✔
307
                                    ("b", buffer![4, 5, 6].into_array()),
4✔
308
                                    ("c", buffer![4, 5, 6].into_array()),
4✔
309
                                ]
4✔
310
                                .as_slice(),
4✔
311
                            )
4✔
312
                            .unwrap()
4✔
313
                            .into_array(),
4✔
314
                        ))
4✔
315
                    }),
4✔
316
                )
317
                .sendable(),
318
            ),
319
        )
320
        .unwrap();
321

322
        (Arc::new(segments), layout)
323
    }
324

325
    #[rstest]
326
    fn test_struct_layout_or(
327
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
328
    ) {
329
        let reader = layout.new_reader("".into(), segments).unwrap();
330
        let filt = or(
331
            eq(col("a"), lit(7)),
332
            or(eq(col("b"), lit(5)), eq(col("a"), lit(3))),
333
        );
334
        let result = block_on(
335
            reader
336
                .filter_evaluation(&(0..3), &filt)
337
                .unwrap()
338
                .invoke(Mask::new_true(3)),
339
        )
340
        .unwrap();
341
        assert_eq!(
342
            vec![true, true, true],
343
            result.to_boolean_buffer().iter().collect_vec()
344
        );
345
    }
346

347
    #[rstest]
348
    fn test_struct_layout(
349
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
350
    ) {
351
        let reader = layout.new_reader("".into(), segments).unwrap();
352
        let expr = gt(get_item("a", root()), get_item("b", root()));
353
        let result = block_on(
354
            reader
355
                .projection_evaluation(&(0..3), &expr)
356
                .unwrap()
357
                .invoke(Mask::new_true(3)),
358
        )
359
        .unwrap();
360
        assert_eq!(
361
            vec![true, false, false],
362
            result
363
                .to_bool()
364
                .unwrap()
365
                .boolean_buffer()
366
                .iter()
367
                .collect::<Vec<_>>()
368
        );
369
    }
370

371
    #[rstest]
372
    fn test_struct_layout_row_mask(
373
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
374
    ) {
375
        let reader = layout.new_reader("".into(), segments).unwrap();
376
        let expr = gt(get_item("a", root()), get_item("b", root()));
377
        let result = block_on(
378
            reader
379
                .projection_evaluation(&(0..3), &expr)
380
                .unwrap()
381
                .invoke(Mask::from_iter([true, true, false])),
382
        )
383
        .unwrap();
384

385
        assert_eq!(result.len(), 2);
386

387
        assert_eq!(
388
            vec![true, false],
389
            result
390
                .to_bool()
391
                .unwrap()
392
                .boolean_buffer()
393
                .iter()
394
                .collect::<Vec<_>>()
395
        );
396
    }
397

398
    #[rstest]
399
    fn test_struct_layout_select(
400
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
401
    ) {
402
        let reader = layout.new_reader("".into(), segments).unwrap();
403
        let expr = pack(
404
            [("a", get_item("a", root())), ("b", get_item("b", root()))],
405
            NonNullable,
406
        );
407
        let result = block_on(
408
            reader
409
                .projection_evaluation(&(0..3), &expr)
410
                .unwrap()
411
                // Take rows 0 and 1, skip row 2, and anything after that
412
                .invoke(Mask::from_iter([true, true, false])),
413
        )
414
        .unwrap();
415

416
        assert_eq!(result.len(), 2);
417

418
        assert_eq!(
419
            result
420
                .to_struct()
421
                .unwrap()
422
                .field_by_name("a")
423
                .unwrap()
424
                .to_primitive()
425
                .unwrap()
426
                .as_slice::<i32>(),
427
            [7, 2].as_slice()
428
        );
429

430
        assert_eq!(
431
            result
432
                .to_struct()
433
                .unwrap()
434
                .field_by_name("b")
435
                .unwrap()
436
                .to_primitive()
437
                .unwrap()
438
                .as_slice::<i32>(),
439
            [4, 5].as_slice()
440
        );
441
    }
442
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc