• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16199748474

10 Jul 2025 03:46PM UTC coverage: 81.084% (+2.9%) from 78.188%
16199748474

Pull #3822

github

web-flow
Merge 6292db2a7 into 3ed9f3090
Pull Request #3822: chore: TPC-H CI/coverage improvements

45627 of 56271 relevant lines covered (81.08%)

145072.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.16
/vortex-layout/src/layouts/struct_/reader.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::collections::BTreeSet;
5
use std::ops::Range;
6
use std::sync::Arc;
7

8
use dashmap::DashMap;
9
use itertools::Itertools;
10
use vortex_array::ArrayContext;
11
use vortex_array::stats::Precision;
12
use vortex_dtype::{DType, FieldMask, FieldName, StructFields};
13
use vortex_error::{VortexExpect, VortexResult, vortex_err};
14
use vortex_expr::transform::immediate_access::annotate_scope_access;
15
use vortex_expr::transform::partition::{PartitionedExpr, partition};
16
use vortex_expr::transform::replace::{replace, replace_root_fields};
17
use vortex_expr::transform::simplify_typed::simplify_typed;
18
use vortex_expr::{ExactExpr, ExprRef, ScopeDType, col, root};
19
use vortex_utils::aliases::hash_map::HashMap;
20

21
use crate::layouts::partitioned::{PartitionedArrayEvaluation, PartitionedMaskEvaluation};
22
use crate::layouts::struct_::StructLayout;
23
use crate::segments::SegmentSource;
24
use crate::{
25
    ArrayEvaluation, LayoutReader, LayoutReaderRef, LazyReaderChildren, MaskEvaluation,
26
    NoOpPruningEvaluation, PruningEvaluation,
27
};
28

29
pub struct StructReader {
30
    layout: StructLayout,
31
    name: Arc<str>,
32
    lazy_children: LazyReaderChildren,
33

34
    /// A `pack` expression that holds each individual field of the root DType. This expansion
35
    /// ensures we can correctly partition expressions over the fields of the struct.
36
    expanded_root_expr: ExprRef,
37

38
    field_lookup: Option<HashMap<FieldName, usize>>,
39
    partitioned_expr_cache: DashMap<ExactExpr, Partitioned>,
40
}
41

42
impl StructReader {
43
    pub(super) fn try_new(
1,008✔
44
        layout: StructLayout,
1,008✔
45
        name: Arc<str>,
1,008✔
46
        segment_source: Arc<dyn SegmentSource>,
1,008✔
47
        ctx: ArrayContext,
1,008✔
48
    ) -> VortexResult<Self> {
1,008✔
49
        let struct_dt = layout.struct_fields();
1,008✔
50

1,008✔
51
        // NOTE: This number is arbitrary and likely depends on the longest common prefix of field names
1,008✔
52
        let field_lookup = (struct_dt.nfields() > 80).then(|| {
1,008✔
53
            struct_dt
×
54
                .names()
×
55
                .iter()
×
56
                .enumerate()
×
57
                .map(|(i, n)| (n.clone(), i))
×
58
                .collect()
×
59
        });
1,008✔
60

1,008✔
61
        let lazy_children =
1,008✔
62
            LazyReaderChildren::new(layout.children.clone(), segment_source.clone(), ctx.clone());
1,008✔
63

1,008✔
64
        // Create an expanded root expression that contains all fields of the struct.
1,008✔
65
        let expanded_root_expr = replace_root_fields(root(), struct_dt);
1,008✔
66

1,008✔
67
        // This is where we need to do some complex things with the scan in order to split it into
1,008✔
68
        // different scans for different fields.
1,008✔
69
        Ok(Self {
1,008✔
70
            layout,
1,008✔
71
            name,
1,008✔
72
            expanded_root_expr,
1,008✔
73
            lazy_children,
1,008✔
74
            field_lookup,
1,008✔
75
            partitioned_expr_cache: Default::default(),
1,008✔
76
        })
1,008✔
77
    }
1,008✔
78

79
    /// Return the [`StructFields`] of this layout.
80
    fn struct_fields(&self) -> &StructFields {
36,736✔
81
        self.layout.struct_fields()
36,736✔
82
    }
36,736✔
83

84
    /// Return the child reader for the field.
85
    fn child(&self, name: &FieldName) -> VortexResult<&LayoutReaderRef> {
10,104✔
86
        let idx = self
10,104✔
87
            .field_lookup
10,104✔
88
            .as_ref()
10,104✔
89
            .and_then(|lookup| lookup.get(name).copied())
10,104✔
90
            .or_else(|| self.struct_fields().find(name))
10,104✔
91
            .ok_or_else(|| vortex_err!("Field {} not found in struct layout", name))?;
10,104✔
92
        self.child_by_idx(idx)
10,104✔
93
    }
10,104✔
94

95
    /// Return the child reader for the field, by index.
96
    fn child_by_idx(&self, idx: usize) -> VortexResult<&LayoutReaderRef> {
13,316✔
97
        let field_dtype = self
13,316✔
98
            .struct_fields()
13,316✔
99
            .field_by_index(idx)
13,316✔
100
            .ok_or_else(|| vortex_err!("Missing field {idx}"))?;
13,316✔
101
        let name = &self.struct_fields().names()[idx];
13,316✔
102
        self.lazy_children
13,316✔
103
            .get(idx, &field_dtype, &format!("{}.{}", self.name, name).into())
13,316✔
104
    }
13,316✔
105

106
    /// Utility for partitioning an expression over the fields of a struct.
107
    fn partition_expr(&self, expr: ExprRef) -> Partitioned {
6,744✔
108
        self.partitioned_expr_cache
6,744✔
109
            .entry(ExactExpr(expr.clone()))
6,744✔
110
            .or_insert_with(|| {
6,744✔
111
                // First, we expand the root scope into the fields of the struct to ensure
1,898✔
112
                // that partitioning works correctly.
1,898✔
113
                let expr = replace(expr.clone(), &root(), self.expanded_root_expr.clone());
1,898✔
114
                let expr = simplify_typed(expr, &ScopeDType::new(self.dtype().clone()))
1,898✔
115
                    .vortex_expect("We should not fail to simplify expression over struct fields");
1,898✔
116

1,898✔
117
                // Partition the expression into expressions that can be evaluated over individual fields
1,898✔
118
                let mut partitioned = partition(
1,898✔
119
                    expr.clone(),
1,898✔
120
                    self.dtype(),
1,898✔
121
                    annotate_scope_access(
1,898✔
122
                        self.dtype()
1,898✔
123
                            .as_struct()
1,898✔
124
                            .vortex_expect("We know it's a struct DType"),
1,898✔
125
                    ),
1,898✔
126
                )
1,898✔
127
                .vortex_expect("We should not fail to partition expression over struct fields");
1,898✔
128

1,898✔
129
                if partitioned.partitions.len() == 1 {
1,898✔
130
                    // If there's only one partition, we step into the field scope of the original
131
                    // expression by replacing any `$.a` with `$`.
132
                    return Partitioned::Single(
1,032✔
133
                        partitioned.partition_names[0].clone(),
1,032✔
134
                        replace(
1,032✔
135
                            expr.clone(),
1,032✔
136
                            &col(partitioned.partition_names[0].clone()),
1,032✔
137
                            root(),
1,032✔
138
                        ),
1,032✔
139
                    );
1,032✔
140
                }
866✔
141

866✔
142
                // We now need to process the partitioned expressions to rewrite the root scope
866✔
143
                // to be that of the field, rather than the struct. In other words, "stepping in"
866✔
144
                // to the field scope.
866✔
145
                partitioned.partitions = partitioned
866✔
146
                    .partitions
866✔
147
                    .iter()
866✔
148
                    .zip_eq(partitioned.partition_names.iter())
866✔
149
                    .map(|(e, name)| replace(e.clone(), &col(name.clone()), root()))
2,464✔
150
                    .collect();
866✔
151

866✔
152
                Partitioned::Multi(Arc::new(partitioned))
866✔
153
            })
6,744✔
154
            .clone()
6,744✔
155
    }
6,744✔
156
}
157

158
/// When partitioning an expression, in the case it only has a single partition we can avoid
159
/// some cost and just delegate to the child reader directly.
160
#[derive(Clone)]
161
enum Partitioned {
162
    Single(FieldName, ExprRef),
163
    Multi(Arc<PartitionedExpr<FieldName>>),
164
}
165

166
impl LayoutReader for StructReader {
167
    fn name(&self) -> &Arc<str> {
1,284✔
168
        &self.name
1,284✔
169
    }
1,284✔
170

171
    fn dtype(&self) -> &DType {
9,220✔
172
        self.layout.dtype()
9,220✔
173
    }
9,220✔
174

175
    fn scope_dtype(&self) -> &ScopeDType {
2,568✔
176
        self.layout.scope_dtype()
2,568✔
177
    }
2,568✔
178

179
    fn row_count(&self) -> Precision<u64> {
×
180
        Precision::Exact(self.layout.row_count())
×
181
    }
×
182

183
    fn register_splits(
1,284✔
184
        &self,
1,284✔
185
        field_mask: &[FieldMask],
1,284✔
186
        row_offset: u64,
1,284✔
187
        splits: &mut BTreeSet<u64>,
1,284✔
188
    ) -> VortexResult<()> {
1,284✔
189
        // In the case of an empty struct, we need to register the end split.
1,284✔
190
        splits.insert(row_offset + self.layout.row_count);
1,284✔
191

1,284✔
192
        self.layout.matching_fields(field_mask, |mask, idx| {
3,212✔
193
            self.child_by_idx(idx)?
3,212✔
194
                .register_splits(&[mask], row_offset, splits)
3,212✔
195
        })
3,212✔
196
    }
1,284✔
197

198
    fn pruning_evaluation(
2,360✔
199
        &self,
2,360✔
200
        row_range: &Range<u64>,
2,360✔
201
        expr: &ExprRef,
2,360✔
202
    ) -> VortexResult<Box<dyn PruningEvaluation>> {
2,360✔
203
        // Partition the expression into expressions that can be evaluated over individual fields
2,360✔
204
        match &self.partition_expr(expr.clone()) {
2,360✔
205
            Partitioned::Single(name, partition) => {
2,136✔
206
                self.child(name)?.pruning_evaluation(row_range, partition)
2,136✔
207
            }
208
            Partitioned::Multi(_) => {
209
                // TODO(ngates): if all partitions are boolean, we can use a pruning evaluation. Otherwise
210
                //  there's not much we can do? Maybe... it's complicated...
211
                Ok(Box::new(NoOpPruningEvaluation))
224✔
212
            }
213
        }
214
    }
2,360✔
215

216
    fn filter_evaluation(
2,361✔
217
        &self,
2,361✔
218
        row_range: &Range<u64>,
2,361✔
219
        expr: &ExprRef,
2,361✔
220
    ) -> VortexResult<Box<dyn MaskEvaluation>> {
2,361✔
221
        // Partition the expression into expressions that can be evaluated over individual fields
2,361✔
222
        match &self.partition_expr(expr.clone()) {
2,361✔
223
            Partitioned::Single(name, partition) => {
2,136✔
224
                self.child(name)?.filter_evaluation(row_range, partition)
2,136✔
225
            }
226
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedMaskEvaluation::try_new(
225✔
227
                partitioned.clone(),
225✔
228
                |name, expr| self.child(name)?.filter_evaluation(row_range, expr),
225✔
229
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
450✔
230
            )?)),
225✔
231
        }
232
    }
2,361✔
233

234
    fn projection_evaluation(
2,023✔
235
        &self,
2,023✔
236
        row_range: &Range<u64>,
2,023✔
237
        expr: &ExprRef,
2,023✔
238
    ) -> VortexResult<Box<dyn ArrayEvaluation>> {
2,023✔
239
        // Partition the expression into expressions that can be evaluated over individual fields
2,023✔
240
        match &self.partition_expr(expr.clone()) {
2,023✔
241
            Partitioned::Single(name, partition) => self
440✔
242
                .child(name)?
440✔
243
                .projection_evaluation(row_range, partition),
440✔
244
            Partitioned::Multi(partitioned) => Ok(Box::new(PartitionedArrayEvaluation::try_new(
1,583✔
245
                partitioned.clone(),
1,583✔
246
                |name, expr| self.child(name)?.projection_evaluation(row_range, expr),
4,942✔
247
            )?)),
1,583✔
248
        }
249
    }
2,023✔
250
}
251

252
#[cfg(test)]
253
mod tests {
254
    use std::sync::Arc;
255

256
    use arcref::ArcRef;
257
    use futures::executor::block_on;
258
    use futures::stream;
259
    use itertools::Itertools;
260
    use rstest::{fixture, rstest};
261
    use vortex_array::arrays::StructArray;
262
    use vortex_array::{Array, ArrayContext, IntoArray, ToCanonical};
263
    use vortex_buffer::buffer;
264
    use vortex_dtype::Nullability::NonNullable;
265
    use vortex_dtype::PType::I32;
266
    use vortex_dtype::{DType, StructFields};
267
    use vortex_expr::{eq, get_item, get_item_scope, gt, lit, or, pack, root};
268
    use vortex_mask::Mask;
269

270
    use crate::layouts::flat::writer::FlatLayoutStrategy;
271
    use crate::layouts::struct_::writer::StructStrategy;
272
    use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
273
    use crate::sequence::SequenceId;
274
    use crate::{LayoutRef, LayoutStrategy, SequentialStreamAdapter, SequentialStreamExt as _};
275

276
    #[fixture]
12✔
277
    /// Create a chunked layout with three chunks of primitive arrays.
278
    fn struct_layout() -> (ArrayContext, Arc<dyn SegmentSource>, LayoutRef) {
279
        let ctx = ArrayContext::empty();
280
        let segments = TestSegments::default();
281
        let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
282
        let strategy =
283
            StructStrategy::new(ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())));
284
        let layout = block_on(
285
            strategy.write_stream(
286
                &ctx,
287
                sequence_writer,
288
                SequentialStreamAdapter::new(
289
                    DType::Struct(
290
                        StructFields::new(
291
                            vec!["a".into(), "b".into(), "c".into()].into(),
292
                            vec![I32.into(), I32.into(), I32.into()],
293
                        ),
294
                        NonNullable,
295
                    ),
296
                    stream::once(async {
4✔
297
                        Ok((
4✔
298
                            SequenceId::root().downgrade(),
4✔
299
                            StructArray::from_fields(
4✔
300
                                [
4✔
301
                                    ("a", buffer![7, 2, 3].into_array()),
4✔
302
                                    ("b", buffer![4, 5, 6].into_array()),
4✔
303
                                    ("c", buffer![4, 5, 6].into_array()),
4✔
304
                                ]
4✔
305
                                .as_slice(),
4✔
306
                            )
4✔
307
                            .unwrap()
4✔
308
                            .into_array(),
4✔
309
                        ))
4✔
310
                    }),
4✔
311
                )
312
                .sendable(),
313
            ),
314
        )
315
        .unwrap();
316

317
        (ctx, Arc::new(segments), layout)
318
    }
319

320
    #[rstest]
321
    fn test_struct_layout_or(
322
        #[from(struct_layout)] (ctx, segments, layout): (
323
            ArrayContext,
324
            Arc<dyn SegmentSource>,
325
            LayoutRef,
326
        ),
327
    ) {
328
        let reader = layout.new_reader("".into(), segments, ctx).unwrap();
329
        let filt = or(
330
            eq(get_item_scope("a"), lit(7)),
331
            or(
332
                eq(get_item_scope("b"), lit(5)),
333
                eq(get_item_scope("a"), lit(3)),
334
            ),
335
        );
336
        let result = block_on(
337
            reader
338
                .filter_evaluation(&(0..3), &filt)
339
                .unwrap()
340
                .invoke(Mask::new_true(3)),
341
        )
342
        .unwrap();
343
        assert_eq!(
344
            vec![true, true, true],
345
            result.to_boolean_buffer().iter().collect_vec()
346
        );
347
    }
348

349
    #[rstest]
350
    fn test_struct_layout(
351
        #[from(struct_layout)] (ctx, segments, layout): (
352
            ArrayContext,
353
            Arc<dyn SegmentSource>,
354
            LayoutRef,
355
        ),
356
    ) {
357
        let reader = layout.new_reader("".into(), segments, ctx).unwrap();
358
        let expr = gt(get_item("a", root()), get_item("b", root()));
359
        let result = block_on(
360
            reader
361
                .projection_evaluation(&(0..3), &expr)
362
                .unwrap()
363
                .invoke(Mask::new_true(3)),
364
        )
365
        .unwrap();
366
        assert_eq!(
367
            vec![true, false, false],
368
            result
369
                .to_bool()
370
                .unwrap()
371
                .boolean_buffer()
372
                .iter()
373
                .collect::<Vec<_>>()
374
        );
375
    }
376

377
    #[rstest]
378
    fn test_struct_layout_row_mask(
379
        #[from(struct_layout)] (ctx, segments, layout): (
380
            ArrayContext,
381
            Arc<dyn SegmentSource>,
382
            LayoutRef,
383
        ),
384
    ) {
385
        let reader = layout.new_reader("".into(), segments, ctx).unwrap();
386
        let expr = gt(get_item("a", root()), get_item("b", root()));
387
        let result = block_on(
388
            reader
389
                .projection_evaluation(&(0..3), &expr)
390
                .unwrap()
391
                .invoke(Mask::from_iter([true, true, false])),
392
        )
393
        .unwrap();
394

395
        assert_eq!(result.len(), 2);
396

397
        assert_eq!(
398
            vec![true, false],
399
            result
400
                .to_bool()
401
                .unwrap()
402
                .boolean_buffer()
403
                .iter()
404
                .collect::<Vec<_>>()
405
        );
406
    }
407

408
    #[rstest]
409
    fn test_struct_layout_select(
410
        #[from(struct_layout)] (ctx, segments, layout): (
411
            ArrayContext,
412
            Arc<dyn SegmentSource>,
413
            LayoutRef,
414
        ),
415
    ) {
416
        let reader = layout.new_reader("".into(), segments, ctx).unwrap();
417
        let expr = pack(
418
            [("a", get_item("a", root())), ("b", get_item("b", root()))],
419
            NonNullable,
420
        );
421
        let result = block_on(
422
            reader
423
                .projection_evaluation(&(0..3), &expr)
424
                .unwrap()
425
                // Take rows 0 and 1, skip row 2, and anything after that
426
                .invoke(Mask::from_iter([true, true, false])),
427
        )
428
        .unwrap();
429

430
        assert_eq!(result.len(), 2);
431

432
        assert_eq!(
433
            result
434
                .to_struct()
435
                .unwrap()
436
                .field_by_name("a")
437
                .unwrap()
438
                .to_primitive()
439
                .unwrap()
440
                .as_slice::<i32>(),
441
            [7, 2].as_slice()
442
        );
443

444
        assert_eq!(
445
            result
446
                .to_struct()
447
                .unwrap()
448
                .field_by_name("b")
449
                .unwrap()
450
                .to_primitive()
451
                .unwrap()
452
                .as_slice::<i32>(),
453
            [4, 5].as_slice()
454
        );
455
    }
456
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc