• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16292916634

15 Jul 2025 12:12PM UTC coverage: 81.526% (+0.04%) from 81.486%
16292916634

Pull #3876

github

web-flow
Merge 998f79955 into 0b091c30a
Pull Request #3876: feat[layout]: replace register_splits with a layout splits stream

446 of 484 new or added lines in 16 files covered. (92.15%)

12 existing lines in 3 files now uncovered.

46534 of 57079 relevant lines covered (81.53%)

249443.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.18
/vortex-layout/src/layouts/struct_/reader.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::ops::Range;
5
use std::sync::Arc;
6

7
use arrow_buffer::ArrowNativeType;
8
use dashmap::DashMap;
9
use futures::{StreamExt, stream};
10
use itertools::Itertools;
11
use vortex_array::stats::Precision;
12
use vortex_dtype::{DType, FieldMask, FieldName, StructFields};
13
use vortex_error::{VortexExpect, VortexResult, vortex_err};
14
use vortex_expr::transform::immediate_access::annotate_scope_access;
15
use vortex_expr::transform::partition::{PartitionedExpr, partition};
16
use vortex_expr::transform::replace::{replace, replace_root_fields};
17
use vortex_expr::transform::simplify_typed::simplify_typed;
18
use vortex_expr::{ExactExpr, ExprRef, col, root};
19
use vortex_mask::Mask;
20
use vortex_utils::aliases::hash_map::HashMap;
21

22
use crate::layouts::partitioned::{PartitionedArrayEvaluation, PartitionedMaskEvaluation};
23
use crate::layouts::struct_::StructLayout;
24
use crate::masks::{IntersectionMaskStream, MaskStream};
25
use crate::segments::SegmentSource;
26
use crate::{
27
    ArrayEvaluation, LayoutReader, LayoutReaderRef, LazyReaderChildren, MaskEvaluation,
28
    NoOpPruningEvaluation, PruningEvaluation,
29
};
30

31
pub struct StructReader {
32
    layout: StructLayout,
33
    name: Arc<str>,
34
    lazy_children: LazyReaderChildren,
35

36
    /// A `pack` expression that holds each individual field of the root DType. This expansion
37
    /// ensures we can correctly partition expressions over the fields of the struct.
38
    expanded_root_expr: ExprRef,
39

40
    field_lookup: Option<HashMap<FieldName, usize>>,
41
    partitioned_expr_cache: DashMap<ExactExpr, Partitioned>,
42
}
43

44
impl StructReader {
45
    pub(super) fn try_new(
980✔
46
        layout: StructLayout,
980✔
47
        name: Arc<str>,
980✔
48
        segment_source: Arc<dyn SegmentSource>,
980✔
49
    ) -> VortexResult<Self> {
980✔
50
        let struct_dt = layout.struct_fields();
980✔
51

980✔
52
        // NOTE: This number is arbitrary and likely depends on the longest common prefix of field names
980✔
53
        let field_lookup = (struct_dt.nfields() > 80).then(|| {
980✔
54
            struct_dt
×
55
                .names()
×
56
                .iter()
×
57
                .enumerate()
×
58
                .map(|(i, n)| (n.clone(), i))
×
59
                .collect()
×
60
        });
980✔
61

980✔
62
        let lazy_children =
980✔
63
            LazyReaderChildren::new(layout.children.clone(), segment_source.clone());
980✔
64

980✔
65
        // Create an expanded root expression that contains all fields of the struct.
980✔
66
        let expanded_root_expr = replace_root_fields(root(), struct_dt);
980✔
67

980✔
68
        // This is where we need to do some complex things with the scan in order to split it into
980✔
69
        // different scans for different fields.
980✔
70
        Ok(Self {
980✔
71
            layout,
980✔
72
            name,
980✔
73
            expanded_root_expr,
980✔
74
            lazy_children,
980✔
75
            field_lookup,
980✔
76
            partitioned_expr_cache: Default::default(),
980✔
77
        })
980✔
78
    }
980✔
79

80
    /// Return the [`StructFields`] of this layout.
81
    fn struct_fields(&self) -> &StructFields {
77,140✔
82
        self.layout.struct_fields()
77,140✔
83
    }
77,140✔
84

85
    /// Return the child reader for the field.
86
    fn child(&self, name: &FieldName) -> VortexResult<&LayoutReaderRef> {
23,572✔
87
        let idx = self
23,572✔
88
            .field_lookup
23,572✔
89
            .as_ref()
23,572✔
90
            .and_then(|lookup| lookup.get(name).copied())
23,572✔
91
            .or_else(|| self.struct_fields().find(name))
23,572✔
92
            .ok_or_else(|| vortex_err!("Field {} not found in struct layout", name))?;
23,572✔
93
        self.child_by_idx(idx)
23,572✔
94
    }
23,572✔
95

96
    /// Return the child reader for the field, by index.
97
    fn child_by_idx(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
26,784✔
98
        let field_dtype = self
26,784✔
99
            .struct_fields()
26,784✔
100
            .field_by_index(idx)
26,784✔
101
            .ok_or_else(|| vortex_err!("Missing field {idx}"))?;
26,784✔
102
        let name = &self.struct_fields().names()[idx];
26,784✔
103
        self.lazy_children
26,784✔
104
            .get(idx, &field_dtype, &format!("{}.{}", self.name, name).into())
26,784✔
105
    }
26,784✔
106

107
    /// Utility for partitioning an expression over the fields of a struct.
108
    fn partition_expr(&self, expr: ExprRef) -> Partitioned {
14,318✔
109
        self.partitioned_expr_cache
14,318✔
110
            .entry(ExactExpr(expr.clone()))
14,318✔
111
            .or_insert_with(|| {
14,318✔
112
                // First, we expand the root scope into the fields of the struct to ensure
1,888✔
113
                // that partitioning works correctly.
1,888✔
114
                let expr = replace(expr.clone(), &root(), self.expanded_root_expr.clone());
1,888✔
115
                let expr = simplify_typed(expr, self.dtype())
1,888✔
116
                    .vortex_expect("We should not fail to simplify expression over struct fields");
1,888✔
117

1,888✔
118
                // Partition the expression into expressions that can be evaluated over individual fields
1,888✔
119
                let mut partitioned = partition(
1,888✔
120
                    expr.clone(),
1,888✔
121
                    self.dtype(),
1,888✔
122
                    annotate_scope_access(
1,888✔
123
                        self.dtype()
1,888✔
124
                            .as_struct()
1,888✔
125
                            .vortex_expect("We know it's a struct DType"),
1,888✔
126
                    ),
1,888✔
127
                )
1,888✔
128
                .vortex_expect("We should not fail to partition expression over struct fields");
1,888✔
129

1,888✔
130
                if partitioned.partitions.len() == 1 {
1,888✔
131
                    // If there's only one partition, we step into the field scope of the original
132
                    // expression by replacing any `$.a` with `$`.
133
                    return Partitioned::Single(
1,024✔
134
                        partitioned.partition_names[0].clone(),
1,024✔
135
                        replace(
1,024✔
136
                            expr.clone(),
1,024✔
137
                            &col(partitioned.partition_names[0].clone()),
1,024✔
138
                            root(),
1,024✔
139
                        ),
1,024✔
140
                    );
1,024✔
141
                }
864✔
142

864✔
143
                // We now need to process the partitioned expressions to rewrite the root scope
864✔
144
                // to be that of the field, rather than the struct. In other words, "stepping in"
864✔
145
                // to the field scope.
864✔
146
                partitioned.partitions = partitioned
864✔
147
                    .partitions
864✔
148
                    .iter()
864✔
149
                    .zip_eq(partitioned.partition_names.iter())
864✔
150
                    .map(|(e, name)| replace(e.clone(), &col(name.clone()), root()))
2,460✔
151
                    .collect();
864✔
152

864✔
153
                Partitioned::Multi(Arc::new(partitioned))
864✔
154
            })
14,318✔
155
            .clone()
14,318✔
156
    }
14,318✔
157
}
158

159
/// When partitioning an expression, in the case it only has a single partition we can avoid
160
/// some cost and just delegate to the child reader directly.
161
#[derive(Clone)]
162
enum Partitioned {
163
    Single(FieldName, ExprRef),
164
    Multi(Arc<PartitionedExpr<FieldName>>),
165
}
166

167
impl LayoutReader for StructReader {
168
    fn name(&self) -> &Arc<str> {
1,284✔
169
        &self.name
1,284✔
170
    }
1,284✔
171

172
    fn dtype(&self) -> &DType {
12,048✔
173
        self.layout.dtype()
12,048✔
174
    }
12,048✔
175

176
    fn row_count(&self) -> Precision<u64> {
×
177
        Precision::Exact(self.layout.row_count())
×
178
    }
×
179

180
    fn row_masks(&self, field_mask: &[FieldMask]) -> MaskStream {
1,284✔
181
        // Here we construct a stream of masks for each field in the field_mask, and then take
1,284✔
182
        // the smallest mask from each field.
1,284✔
183
        // If the field_mask is empty, we return a stream of all true masks.
1,284✔
184
        if field_mask.is_empty() {
1,284✔
NEW
185
            let row_count = self.layout.row_count().as_usize();
×
NEW
186
            return Box::pin(stream::once(async move { Ok(Mask::AllTrue(row_count)) })).boxed();
×
187
        }
1,284✔
188

1,284✔
189
        let mut field_streams = Vec::with_capacity(field_mask.len());
1,284✔
190
        self.layout
1,284✔
191
            .matching_fields(field_mask, |mask, idx| {
3,212✔
192
                let child = self.child_by_idx(idx)?;
3,212✔
193
                field_streams.push(child.row_masks(&[mask]));
3,212✔
194
                Ok(())
3,212✔
195
            })
3,212✔
196
            .vortex_expect("infallible");
1,284✔
197

1,284✔
198
        // FIXME(ngates): if there are no fields, we need a stream of masks that covers row_count.
1,284✔
199
        IntersectionMaskStream::new(field_streams).boxed()
1,284✔
200
    }
1,284✔
201

202
    fn pruning_evaluation(
4,940✔
203
        &self,
4,940✔
204
        row_range: &Range<u64>,
4,940✔
205
        expr: &ExprRef,
4,940✔
206
    ) -> VortexResult<Box<dyn PruningEvaluation>> {
4,940✔
207
        // Partition the expression into expressions that can be evaluated over individual fields
4,940✔
208
        match &self.partition_expr(expr.clone()) {
4,940✔
209
            Partitioned::Single(name, partition) => {
4,436✔
210
                self.child(name)?.pruning_evaluation(row_range, partition)
4,436✔
211
            }
212
            Partitioned::Multi(_) => {
213
                // TODO(ngates): if all partitions are boolean, we can use a pruning evaluation. Otherwise
214
                //  there's not much we can do? Maybe... it's complicated...
215
                Ok(Box::new(NoOpPruningEvaluation))
504✔
216
            }
217
        }
218
    }
4,940✔
219

220
    fn filter_evaluation(
4,941✔
221
        &self,
4,941✔
222
        row_range: &Range<u64>,
4,941✔
223
        expr: &ExprRef,
4,941✔
224
    ) -> VortexResult<Box<dyn MaskEvaluation>> {
4,941✔
225
        // Partition the expression into expressions that can be evaluated over individual fields
4,941✔
226
        match &self.partition_expr(expr.clone()) {
4,941✔
227
            Partitioned::Single(name, partition) => {
4,436✔
228
                self.child(name)?.filter_evaluation(row_range, partition)
4,436✔
229
            }
230
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedMaskEvaluation::try_new(
505✔
231
                partitioned.clone(),
505✔
232
                |name, expr| self.child(name)?.filter_evaluation(row_range, expr),
505✔
233
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
1,010✔
234
            )?)),
505✔
235
        }
236
    }
4,941✔
237

238
    fn projection_evaluation(
4,437✔
239
        &self,
4,437✔
240
        row_range: &Range<u64>,
4,437✔
241
        expr: &ExprRef,
4,437✔
242
    ) -> VortexResult<Box<dyn ArrayEvaluation>> {
4,437✔
243
        // Partition the expression into expressions that can be evaluated over individual fields
4,437✔
244
        match &self.partition_expr(expr.clone()) {
4,437✔
245
            Partitioned::Single(name, partition) => self
488✔
246
                .child(name)?
488✔
247
                .projection_evaluation(row_range, partition),
488✔
248
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedArrayEvaluation::try_new(
3,949✔
249
                partitioned.clone(),
3,949✔
250
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
13,202✔
251
            )?)),
3,949✔
252
        }
253
    }
4,437✔
254
}
255

256
#[cfg(test)]
257
mod tests {
258
    use std::sync::Arc;
259

260
    use arcref::ArcRef;
261
    use futures::executor::block_on;
262
    use futures::stream;
263
    use itertools::Itertools;
264
    use rstest::{fixture, rstest};
265
    use vortex_array::arrays::StructArray;
266
    use vortex_array::{Array, ArrayContext, IntoArray, ToCanonical};
267
    use vortex_buffer::buffer;
268
    use vortex_dtype::Nullability::NonNullable;
269
    use vortex_dtype::PType::I32;
270
    use vortex_dtype::{DType, StructFields};
271
    use vortex_expr::{eq, get_item, get_item_scope, gt, lit, or, pack, root};
272
    use vortex_mask::Mask;
273

274
    use crate::layouts::flat::writer::FlatLayoutStrategy;
275
    use crate::layouts::struct_::writer::StructStrategy;
276
    use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
277
    use crate::sequence::SequenceId;
278
    use crate::{LayoutRef, LayoutStrategy, SequentialStreamAdapter, SequentialStreamExt as _};
279

280
    #[fixture]
12✔
281
    /// Create a chunked layout with three chunks of primitive arrays.
282
    fn struct_layout() -> (Arc<dyn SegmentSource>, LayoutRef) {
283
        let ctx = ArrayContext::empty();
284
        let segments = TestSegments::default();
285
        let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
286
        let strategy =
287
            StructStrategy::new(ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())));
288
        let layout = block_on(
289
            strategy.write_stream(
290
                &ctx,
291
                sequence_writer,
292
                SequentialStreamAdapter::new(
293
                    DType::Struct(
294
                        StructFields::new(
295
                            vec!["a".into(), "b".into(), "c".into()].into(),
296
                            vec![I32.into(), I32.into(), I32.into()],
297
                        ),
298
                        NonNullable,
299
                    ),
300
                    stream::once(async {
4✔
301
                        Ok((
4✔
302
                            SequenceId::root().downgrade(),
4✔
303
                            StructArray::from_fields(
4✔
304
                                [
4✔
305
                                    ("a", buffer![7, 2, 3].into_array()),
4✔
306
                                    ("b", buffer![4, 5, 6].into_array()),
4✔
307
                                    ("c", buffer![4, 5, 6].into_array()),
4✔
308
                                ]
4✔
309
                                .as_slice(),
4✔
310
                            )
4✔
311
                            .unwrap()
4✔
312
                            .into_array(),
4✔
313
                        ))
4✔
314
                    }),
4✔
315
                )
316
                .sendable(),
317
            ),
318
        )
319
        .unwrap();
320

321
        (Arc::new(segments), layout)
322
    }
323

324
    #[rstest]
325
    fn test_struct_layout_or(
326
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
327
    ) {
328
        let reader = layout.new_reader("".into(), segments).unwrap();
329
        let filt = or(
330
            eq(get_item_scope("a"), lit(7)),
331
            or(
332
                eq(get_item_scope("b"), lit(5)),
333
                eq(get_item_scope("a"), lit(3)),
334
            ),
335
        );
336
        let result = block_on(
337
            reader
338
                .filter_evaluation(&(0..3), &filt)
339
                .unwrap()
340
                .invoke(Mask::new_true(3)),
341
        )
342
        .unwrap();
343
        assert_eq!(
344
            vec![true, true, true],
345
            result.to_boolean_buffer().iter().collect_vec()
346
        );
347
    }
348

349
    #[rstest]
350
    fn test_struct_layout(
351
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
352
    ) {
353
        let reader = layout.new_reader("".into(), segments).unwrap();
354
        let expr = gt(get_item("a", root()), get_item("b", root()));
355
        let result = block_on(
356
            reader
357
                .projection_evaluation(&(0..3), &expr)
358
                .unwrap()
359
                .invoke(Mask::new_true(3)),
360
        )
361
        .unwrap();
362
        assert_eq!(
363
            vec![true, false, false],
364
            result
365
                .to_bool()
366
                .unwrap()
367
                .boolean_buffer()
368
                .iter()
369
                .collect::<Vec<_>>()
370
        );
371
    }
372

373
    #[rstest]
374
    fn test_struct_layout_row_mask(
375
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
376
    ) {
377
        let reader = layout.new_reader("".into(), segments).unwrap();
378
        let expr = gt(get_item("a", root()), get_item("b", root()));
379
        let result = block_on(
380
            reader
381
                .projection_evaluation(&(0..3), &expr)
382
                .unwrap()
383
                .invoke(Mask::from_iter([true, true, false])),
384
        )
385
        .unwrap();
386

387
        assert_eq!(result.len(), 2);
388

389
        assert_eq!(
390
            vec![true, false],
391
            result
392
                .to_bool()
393
                .unwrap()
394
                .boolean_buffer()
395
                .iter()
396
                .collect::<Vec<_>>()
397
        );
398
    }
399

400
    #[rstest]
401
    fn test_struct_layout_select(
402
        #[from(struct_layout)] (segments, layout): (Arc<dyn SegmentSource>, LayoutRef),
403
    ) {
404
        let reader = layout.new_reader("".into(), segments).unwrap();
405
        let expr = pack(
406
            [("a", get_item("a", root())), ("b", get_item("b", root()))],
407
            NonNullable,
408
        );
409
        let result = block_on(
410
            reader
411
                .projection_evaluation(&(0..3), &expr)
412
                .unwrap()
413
                // Take rows 0 and 1, skip row 2, and anything after that
414
                .invoke(Mask::from_iter([true, true, false])),
415
        )
416
        .unwrap();
417

418
        assert_eq!(result.len(), 2);
419

420
        assert_eq!(
421
            result
422
                .to_struct()
423
                .unwrap()
424
                .field_by_name("a")
425
                .unwrap()
426
                .to_primitive()
427
                .unwrap()
428
                .as_slice::<i32>(),
429
            [7, 2].as_slice()
430
        );
431

432
        assert_eq!(
433
            result
434
                .to_struct()
435
                .unwrap()
436
                .field_by_name("b")
437
                .unwrap()
438
                .to_primitive()
439
                .unwrap()
440
                .as_slice::<i32>(),
441
            [4, 5].as_slice()
442
        );
443
    }
444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc