• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16419023817

21 Jul 2025 01:56PM UTC coverage: 81.673%. First build
16419023817

Pull #3876

github

web-flow
Merge 3389f94e0 into 56156aa03
Pull Request #3876: feat[layout]: replace register_splits with a layout splits stream

726 of 777 new or added lines in 17 files covered. (93.44%)

42577 of 52131 relevant lines covered (81.67%)

169631.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.45
/vortex-layout/src/layouts/struct_/reader.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::ops::Range;
5
use std::sync::Arc;
6

7
use arrow_buffer::ArrowNativeType;
8
use dashmap::DashMap;
9
use itertools::Itertools;
10
use vortex_array::stats::Precision;
11
use vortex_dtype::{DType, FieldMask, FieldName, StructFields};
12
use vortex_error::{VortexExpect, VortexResult, vortex_err};
13
use vortex_expr::transform::immediate_access::annotate_scope_access;
14
use vortex_expr::transform::partition::{PartitionedExpr, partition};
15
use vortex_expr::transform::replace::{replace, replace_root_fields};
16
use vortex_expr::transform::simplify_typed::simplify_typed;
17
use vortex_expr::{ExactExpr, ExprRef, col, root};
18
use vortex_mask::Mask;
19
use vortex_utils::aliases::hash_map::HashMap;
20

21
use crate::layouts::partitioned::{PartitionedArrayEvaluation, PartitionedMaskEvaluation};
22
use crate::layouts::struct_::StructLayout;
23
use crate::masks::{BoxMaskIterator, IntersectionMaskIterator};
24
use crate::segments::SegmentSource;
25
use crate::tree_row_mask::TreeRowMask;
26
use crate::{
27
    ArrayEvaluation, LayoutReader, LayoutReaderRef, LazyReaderChildren, MaskEvaluation,
28
    NoOpPruningEvaluation, PruningEvaluation,
29
};
30

31
pub struct StructReader {
32
    layout: StructLayout,
33
    name: Arc<str>,
34
    lazy_children: LazyReaderChildren,
35

36
    /// A `pack` expression that holds each individual field of the root DType. This expansion
37
    /// ensures we can correctly partition expressions over the fields of the struct.
38
    expanded_root_expr: ExprRef,
39

40
    field_lookup: Option<HashMap<FieldName, usize>>,
41
    partitioned_expr_cache: DashMap<ExactExpr, Partitioned>,
42
}
43

44
impl StructReader {
45
    pub(super) fn try_new(
1,043✔
46
        layout: StructLayout,
1,043✔
47
        name: Arc<str>,
1,043✔
48
        segment_source: Arc<dyn SegmentSource>,
1,043✔
49
    ) -> VortexResult<Self> {
1,043✔
50
        let struct_dt = layout.struct_fields();
1,043✔
51

52
        // NOTE: This number is arbitrary and likely depends on the longest common prefix of field names
53
        let field_lookup = (struct_dt.nfields() > 80).then(|| {
1,043✔
54
            struct_dt
×
55
                .names()
×
56
                .iter()
×
57
                .enumerate()
×
58
                .map(|(i, n)| (n.clone(), i))
×
59
                .collect()
×
60
        });
×
61

62
        let lazy_children =
1,043✔
63
            LazyReaderChildren::new(layout.children.clone(), segment_source.clone());
1,043✔
64

65
        // Create an expanded root expression that contains all fields of the struct.
66
        let expanded_root_expr = replace_root_fields(root(), struct_dt);
1,043✔
67

68
        // This is where we need to do some complex things with the scan in order to split it into
69
        // different scans for different fields.
70
        Ok(Self {
1,043✔
71
            layout,
1,043✔
72
            name,
1,043✔
73
            expanded_root_expr,
1,043✔
74
            lazy_children,
1,043✔
75
            field_lookup,
1,043✔
76
            partitioned_expr_cache: Default::default(),
1,043✔
77
        })
1,043✔
78
    }
1,043✔
79

80
    /// Return the [`StructFields`] of this layout.
81
    fn struct_fields(&self) -> &StructFields {
37,583✔
82
        self.layout.struct_fields()
37,583✔
83
    }
37,583✔
84

85
    /// Return the child reader for the field.
86
    fn child(&self, name: &FieldName) -> VortexResult<&LayoutReaderRef> {
10,351✔
87
        let idx = self
10,351✔
88
            .field_lookup
10,351✔
89
            .as_ref()
10,351✔
90
            .and_then(|lookup| lookup.get(name).copied())
10,351✔
91
            .or_else(|| self.struct_fields().find(name))
10,351✔
92
            .ok_or_else(|| vortex_err!("Field {} not found in struct layout", name))?;
10,351✔
93
        self.child_by_idx(idx)
10,351✔
94
    }
10,351✔
95

96
    /// Return the child reader for the field, by index.
97
    fn child_by_idx(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
13,616✔
98
        let field_dtype = self
13,616✔
99
            .struct_fields()
13,616✔
100
            .field_by_index(idx)
13,616✔
101
            .ok_or_else(|| vortex_err!("Missing field {idx}"))?;
13,616✔
102
        let name = &self.struct_fields().names()[idx];
13,616✔
103
        self.lazy_children
13,616✔
104
            .get(idx, &field_dtype, &format!("{}.{}", self.name, name).into())
13,616✔
105
    }
13,616✔
106

107
    /// Utility for partitioning an expression over the fields of a struct.
108
    fn partition_expr(&self, expr: ExprRef) -> Partitioned {
6,974✔
109
        self.partitioned_expr_cache
6,974✔
110
            .entry(ExactExpr(expr.clone()))
6,974✔
111
            .or_insert_with(|| {
6,974✔
112
                // First, we expand the root scope into the fields of the struct to ensure
113
                // that partitioning works correctly.
114
                let expr = replace(expr.clone(), &root(), self.expanded_root_expr.clone());
2,258✔
115
                let expr = simplify_typed(expr, self.dtype())
2,258✔
116
                    .vortex_expect("We should not fail to simplify expression over struct fields");
2,258✔
117

118
                // Partition the expression into expressions that can be evaluated over individual fields
119
                let mut partitioned = partition(
2,258✔
120
                    expr.clone(),
2,258✔
121
                    self.dtype(),
2,258✔
122
                    annotate_scope_access(
2,258✔
123
                        self.dtype()
2,258✔
124
                            .as_struct()
2,258✔
125
                            .vortex_expect("We know it's a struct DType"),
2,258✔
126
                    ),
127
                )
128
                .vortex_expect("We should not fail to partition expression over struct fields");
2,258✔
129

130
                if partitioned.partitions.len() == 1 {
2,258✔
131
                    // If there's only one partition, we step into the field scope of the original
132
                    // expression by replacing any `$.a` with `$`.
133
                    return Partitioned::Single(
1,309✔
134
                        partitioned.partition_names[0].clone(),
1,309✔
135
                        replace(
1,309✔
136
                            expr.clone(),
1,309✔
137
                            &col(partitioned.partition_names[0].clone()),
1,309✔
138
                            root(),
1,309✔
139
                        ),
1,309✔
140
                    );
1,309✔
141
                }
949✔
142

143
                // We now need to process the partitioned expressions to rewrite the root scope
144
                // to be that of the field, rather than the struct. In other words, "stepping in"
145
                // to the field scope.
146
                partitioned.partitions = partitioned
949✔
147
                    .partitions
949✔
148
                    .iter()
949✔
149
                    .zip_eq(partitioned.partition_names.iter())
949✔
150
                    .map(|(e, name)| replace(e.clone(), &col(name.clone()), root()))
2,635✔
151
                    .collect();
949✔
152

153
                Partitioned::Multi(Arc::new(partitioned))
949✔
154
            })
2,258✔
155
            .clone()
6,974✔
156
    }
6,974✔
157
}
158

159
/// When partitioning an expression, in the case it only has a single partition we can avoid
160
/// some cost and just delegate to the child reader directly.
161
#[derive(Clone)]
162
enum Partitioned {
163
    Single(FieldName, ExprRef),
164
    Multi(Arc<PartitionedExpr<FieldName>>),
165
}
166

167
impl LayoutReader for StructReader {
168
    fn name(&self) -> &Arc<str> {
1,319✔
169
        &self.name
1,319✔
170
    }
1,319✔
171

172
    fn dtype(&self) -> &DType {
12,931✔
173
        self.layout.dtype()
12,931✔
174
    }
12,931✔
175

176
    fn row_count(&self) -> Precision<u64> {
1,319✔
177
        Precision::Exact(self.layout.row_count())
1,319✔
178
    }
1,319✔
179

180
    fn row_masks(&self, selection: &TreeRowMask, field_mask: &[FieldMask]) -> BoxMaskIterator {
1,319✔
181
        // Here we construct a stream of masks for each field in the field_mask, and then take
182
        // the smallest mask from each field.
183
        // If the field_mask is empty, we return an iterator of all true masks.
184
        if field_mask.is_empty() {
1,319✔
NEW
185
            let row_count = self.layout.row_count().as_usize();
×
NEW
186
            return Box::new(std::iter::once(Ok(Mask::AllTrue(row_count))));
×
187
        }
1,319✔
188

189
        let mut field_iterators = Vec::with_capacity(field_mask.len());
1,319✔
190
        self.layout
1,319✔
191
            .matching_fields(field_mask, |mask, idx| {
3,265✔
192
                let child = self.child_by_idx(idx)?;
3,265✔
193
                field_iterators.push(child.row_masks(selection, &[mask]));
3,265✔
194
                Ok(())
3,265✔
195
            })
3,265✔
196
            .vortex_expect("infallible");
1,319✔
197

198
        // FIXME(ngates): if there are no fields, we need an iterator of masks that covers row_count.
199
        // Now we can use the iterator directly without conversion to streams
200
        Box::new(IntersectionMaskIterator::new(field_iterators))
1,319✔
201
    }
1,319✔
202

203
    fn pruning_evaluation(
2,464✔
204
        &self,
2,464✔
205
        row_range: &Range<u64>,
2,464✔
206
        expr: &ExprRef,
2,464✔
207
    ) -> VortexResult<Box<dyn PruningEvaluation>> {
2,464✔
208
        // Partition the expression into expressions that can be evaluated over individual fields
209
        match &self.partition_expr(expr.clone()) {
2,464✔
210
            Partitioned::Single(name, partition) => {
2,237✔
211
                self.child(name)?.pruning_evaluation(row_range, partition)
2,237✔
212
            }
213
            Partitioned::Multi(_) => {
214
                // TODO(ngates): if all partitions are boolean, we can use a pruning evaluation. Otherwise
215
                //  there's not much we can do? Maybe... it's complicated...
216
                Ok(Box::new(NoOpPruningEvaluation))
227✔
217
            }
218
        }
219
    }
2,464✔
220

221
    fn filter_evaluation(
2,465✔
222
        &self,
2,465✔
223
        row_range: &Range<u64>,
2,465✔
224
        expr: &ExprRef,
2,465✔
225
    ) -> VortexResult<Box<dyn MaskEvaluation>> {
2,465✔
226
        // Partition the expression into expressions that can be evaluated over individual fields
227
        match &self.partition_expr(expr.clone()) {
2,465✔
228
            Partitioned::Single(name, partition) => {
2,237✔
229
                self.child(name)?.filter_evaluation(row_range, partition)
2,237✔
230
            }
231
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedMaskEvaluation::try_new(
228✔
232
                partitioned.clone(),
228✔
233
                |name, expr| self.child(name)?.filter_evaluation(row_range, expr),
×
234
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
456✔
235
            )?)),
×
236
        }
237
    }
2,465✔
238

239
    fn projection_evaluation(
2,045✔
240
        &self,
2,045✔
241
        row_range: &Range<u64>,
2,045✔
242
        expr: &ExprRef,
2,045✔
243
    ) -> VortexResult<Box<dyn ArrayEvaluation>> {
2,045✔
244
        // Partition the expression into expressions that can be evaluated over individual fields
245
        match &self.partition_expr(expr.clone()) {
2,045✔
246
            Partitioned::Single(name, partition) => self
450✔
247
                .child(name)?
450✔
248
                .projection_evaluation(row_range, partition),
450✔
249
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedArrayEvaluation::try_new(
1,595✔
250
                partitioned.clone(),
1,595✔
251
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
4,971✔
252
            )?)),
×
253
        }
254
    }
2,045✔
255
}
256

257
#[cfg(test)]
258
mod tests {
259
    use std::sync::Arc;
260

261
    use arcref::ArcRef;
262
    use futures::executor::block_on;
263
    use futures::stream;
264
    use itertools::Itertools;
265
    use rstest::{fixture, rstest};
266
    use vortex_array::arrays::StructArray;
267
    use vortex_array::{Array, ArrayContext, IntoArray, ToCanonical};
268
    use vortex_buffer::buffer;
269
    use vortex_dtype::Nullability::NonNullable;
270
    use vortex_dtype::PType::I32;
271
    use vortex_dtype::{DType, StructFields};
272
    use vortex_expr::{eq, get_item, get_item_scope, gt, lit, or, pack, root};
273
    use vortex_mask::Mask;
274

275
    use crate::layouts::flat::writer::FlatLayoutStrategy;
276
    use crate::layouts::struct_::writer::StructStrategy;
277
    use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
278
    use crate::sequence::SequenceId;
279
    use crate::{LayoutRef, LayoutStrategy, SequentialStreamAdapter, SequentialStreamExt as _};
280

281
    #[fixture]
282
    /// Create a chunked layout with three chunks of primitive arrays.
283
    fn struct_layout() -> (Arc<dyn SegmentSource>, LayoutRef) {
284
        let ctx = ArrayContext::empty();
285
        let segments = TestSegments::default();
286
        let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
287
        let strategy =
288
            StructStrategy::new(ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())));
289
        let layout = block_on(
290
            strategy.write_stream(
291
                &ctx,
292
                sequence_writer,
293
                SequentialStreamAdapter::new(
294
                    DType::Struct(
295
                        StructFields::new(
296
                            vec!["a".into(), "b".into(), "c".into()].into(),
297
                            vec![I32.into(), I32.into(), I32.into()],
298
                        ),
299
                        NonNullable,
300
                    ),
301
                    stream::once(async {
4✔
302
                        Ok((
4✔
303
                            SequenceId::root().downgrade(),
4✔
304
                            StructArray::from_fields(
4✔
305
                                [
4✔
306
                                    ("a", buffer![7, 2, 3].into_array()),
4✔
307
                                    ("b", buffer![4, 5, 6].into_array()),
4✔
308
                                    ("c", buffer![4, 5, 6].into_array()),
4✔
309
                                ]
4✔
310
                                .as_slice(),
4✔
311
                            )
4✔
312
                            .unwrap()
4✔
313
                            .into_array(),
4✔
314
                        ))
4✔
315
                    }),
4✔
316
                )
317
                .sendable(),
318
            ),
319
        )
320
        .unwrap();
321

322
        (Arc::new(segments), layout)
323
    }
324

325
    #[rstest]
326
    fn test_struct_layout_or(
327
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
328
    ) {
329
        let reader = layout.new_reader("".into(), segments).unwrap();
330
        let filt = or(
331
            eq(get_item_scope("a"), lit(7)),
332
            or(
333
                eq(get_item_scope("b"), lit(5)),
334
                eq(get_item_scope("a"), lit(3)),
335
            ),
336
        );
337
        let result = block_on(
338
            reader
339
                .filter_evaluation(&(0..3), &filt)
340
                .unwrap()
341
                .invoke(Mask::new_true(3)),
342
        )
343
        .unwrap();
344
        assert_eq!(
345
            vec![true, true, true],
346
            result.to_boolean_buffer().iter().collect_vec()
347
        );
348
    }
349

350
    #[rstest]
351
    fn test_struct_layout(
352
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
353
    ) {
354
        let reader = layout.new_reader("".into(), segments).unwrap();
355
        let expr = gt(get_item("a", root()), get_item("b", root()));
356
        let result = block_on(
357
            reader
358
                .projection_evaluation(&(0..3), &expr)
359
                .unwrap()
360
                .invoke(Mask::new_true(3)),
361
        )
362
        .unwrap();
363
        assert_eq!(
364
            vec![true, false, false],
365
            result
366
                .to_bool()
367
                .unwrap()
368
                .boolean_buffer()
369
                .iter()
370
                .collect::<Vec<_>>()
371
        );
372
    }
373

374
    #[rstest]
375
    fn test_struct_layout_row_mask(
376
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
377
    ) {
378
        let reader = layout.new_reader("".into(), segments).unwrap();
379
        let expr = gt(get_item("a", root()), get_item("b", root()));
380
        let result = block_on(
381
            reader
382
                .projection_evaluation(&(0..3), &expr)
383
                .unwrap()
384
                .invoke(Mask::from_iter([true, true, false])),
385
        )
386
        .unwrap();
387

388
        assert_eq!(result.len(), 2);
389

390
        assert_eq!(
391
            vec![true, false],
392
            result
393
                .to_bool()
394
                .unwrap()
395
                .boolean_buffer()
396
                .iter()
397
                .collect::<Vec<_>>()
398
        );
399
    }
400

401
    #[rstest]
402
    fn test_struct_layout_select(
403
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
404
    ) {
405
        let reader = layout.new_reader("".into(), segments).unwrap();
406
        let expr = pack(
407
            [("a", get_item("a", root())), ("b", get_item("b", root()))],
408
            NonNullable,
409
        );
410
        let result = block_on(
411
            reader
412
                .projection_evaluation(&(0..3), &expr)
413
                .unwrap()
414
                // Take rows 0 and 1, skip row 2, and anything after that
415
                .invoke(Mask::from_iter([true, true, false])),
416
        )
417
        .unwrap();
418

419
        assert_eq!(result.len(), 2);
420

421
        assert_eq!(
422
            result
423
                .to_struct()
424
                .unwrap()
425
                .field_by_name("a")
426
                .unwrap()
427
                .to_primitive()
428
                .unwrap()
429
                .as_slice::<i32>(),
430
            [7, 2].as_slice()
431
        );
432

433
        assert_eq!(
434
            result
435
                .to_struct()
436
                .unwrap()
437
                .field_by_name("b")
438
                .unwrap()
439
                .to_primitive()
440
                .unwrap()
441
                .as_slice::<i32>(),
442
            [4, 5].as_slice()
443
        );
444
    }
445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc