• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16262933935

14 Jul 2025 09:17AM UTC coverage: 81.432% (+0.3%) from 81.15%
16262933935

Pull #3844

github

web-flow
Merge a49b86c01 into 382a2c489
Pull Request #3844: feat[compressor]: wire up sequence array into the compressor

155 of 159 new or added lines in 8 files covered. (97.48%)

217 existing lines in 24 files now uncovered.

46015 of 56507 relevant lines covered (81.43%)

146742.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.28
/vortex-layout/src/layouts/flat/reader.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::collections::BTreeSet;
5
use std::ops::{BitAnd, Range};
6
use std::sync::{Arc, OnceLock};
7

8
use async_trait::async_trait;
9
use futures::FutureExt;
10
use vortex_array::compute::filter;
11
use vortex_array::serde::ArrayParts;
12
use vortex_array::stats::Precision;
13
use vortex_array::{Array, ArrayRef};
14
use vortex_dtype::{DType, FieldMask};
15
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap as _};
16
use vortex_expr::{ExprRef, Scope, is_root};
17
use vortex_mask::Mask;
18

19
use crate::layouts::SharedArrayFuture;
20
use crate::layouts::flat::FlatLayout;
21
use crate::segments::SegmentSource;
22
use crate::{
23
    ArrayEvaluation, LayoutReader, MaskEvaluation, NoOpPruningEvaluation, PruningEvaluation,
24
};
25

26
/// The threshold of mask density below which we will evaluate the expression only over the
27
/// selected rows, and above which we evaluate the expression over all rows and then select
28
/// after.
29
// TODO(ngates): more experimentation is needed, and this should probably be dynamic based on the
30
//  actual expression? Perhaps all expressions are given a selection mask to decide for themselves?
31
const EXPR_EVAL_THRESHOLD: f64 = 0.2;
32

33
pub struct FlatReader {
34
    layout: FlatLayout,
35
    name: Arc<str>,
36
    segment_source: Arc<dyn SegmentSource>,
37
    array: OnceLock<SharedArrayFuture>,
38
}
39

40
impl FlatReader {
41
    pub(crate) fn new(
6,966✔
42
        layout: FlatLayout,
6,966✔
43
        name: Arc<str>,
6,966✔
44
        segment_source: Arc<dyn SegmentSource>,
6,966✔
45
    ) -> Self {
6,966✔
46
        Self {
6,966✔
47
            layout,
6,966✔
48
            name,
6,966✔
49
            segment_source,
6,966✔
50
            array: Default::default(),
6,966✔
51
        }
6,966✔
52
    }
6,966✔
53

54
    /// Returns a cached future that resolves this array.
55
    ///
56
    /// This method is idempotent, and returns a cached future on subsequent calls, all of which
57
    /// will use the original segment reader.
58
    // TODO(ngates): caching this and ignoring SegmentReaders may be a terrible idea... we may
59
    //  instead want to store all segment futures and race them, so if a layout requests a
60
    //  projection future before a pruning future, the pruning isn't blocked.
61
    fn array_future(&self) -> VortexResult<SharedArrayFuture> {
8,719✔
62
        let row_count = usize::try_from(self.layout.row_count()).vortex_unwrap();
8,719✔
63

8,719✔
64
        // We create the segment_fut here to ensure we give the segment reader visibility into
8,719✔
65
        // how to prioritize this segment, even if the `array` future has already been initialized.
8,719✔
66
        // This is gross... see the function's TODO for a maybe better solution?
8,719✔
67
        let segment_fut = self
8,719✔
68
            .segment_source
8,719✔
69
            .request(self.layout.segment_id(), &self.name);
8,719✔
70

8,719✔
71
        Ok(self
8,719✔
72
            .array
8,719✔
73
            .get_or_init(|| {
8,719✔
74
                let ctx = self.layout.ctx.clone();
4,839✔
75
                let dtype = self.layout.dtype().clone();
4,839✔
76
                async move {
4,839✔
77
                    let segment = segment_fut.await?;
4,839✔
78
                    ArrayParts::try_from(segment)?
4,839✔
79
                        .decode(&ctx, &dtype, row_count)
4,839✔
80
                        .map_err(Arc::new)
4,839✔
81
                }
4,839✔
82
                .boxed()
4,839✔
83
                .shared()
4,839✔
84
            })
8,719✔
85
            .clone())
8,719✔
86
    }
8,719✔
87
}
88

89
impl LayoutReader for FlatReader {
90
    fn name(&self) -> &Arc<str> {
21✔
91
        &self.name
21✔
92
    }
21✔
93

94
    fn dtype(&self) -> &DType {
327✔
95
        self.layout.dtype()
327✔
96
    }
327✔
97

98
    fn row_count(&self) -> Precision<u64> {
1✔
99
        Precision::Exact(self.layout.row_count())
1✔
100
    }
1✔
101

102
    fn register_splits(
11,949✔
103
        &self,
11,949✔
104
        _field_mask: &[FieldMask],
11,949✔
105
        row_offset: u64,
11,949✔
106
        splits: &mut BTreeSet<u64>,
11,949✔
107
    ) -> VortexResult<()> {
11,949✔
108
        splits.insert(row_offset + self.layout.row_count());
11,949✔
109
        Ok(())
11,949✔
110
    }
11,949✔
111

112
    fn pruning_evaluation(
1,901✔
113
        &self,
1,901✔
114
        _row_range: &Range<u64>,
1,901✔
115
        _expr: &ExprRef,
1,901✔
116
    ) -> VortexResult<Box<dyn PruningEvaluation>> {
1,901✔
117
        Ok(Box::new(NoOpPruningEvaluation))
1,901✔
118
    }
1,901✔
119

120
    fn filter_evaluation(
1,898✔
121
        &self,
1,898✔
122
        row_range: &Range<u64>,
1,898✔
123
        expr: &ExprRef,
1,898✔
124
    ) -> VortexResult<Box<dyn MaskEvaluation>> {
1,898✔
125
        let row_range = usize::try_from(row_range.start)
1,898✔
126
            .vortex_expect("Row range begin must fit within FlatLayout size")
1,898✔
127
            ..usize::try_from(row_range.end)
1,898✔
128
                .vortex_expect("Row range end must fit within FlatLayout size");
1,898✔
129

1,898✔
130
        Ok(Box::new(FlatEvaluation {
1,898✔
131
            name: self.name.clone(),
1,898✔
132
            array: self.array_future()?,
1,898✔
133
            row_range,
1,898✔
134
            expr: expr.clone(),
1,898✔
135
        }))
136
    }
1,898✔
137

138
    fn projection_evaluation(
6,821✔
139
        &self,
6,821✔
140
        row_range: &Range<u64>,
6,821✔
141
        expr: &ExprRef,
6,821✔
142
    ) -> VortexResult<Box<dyn ArrayEvaluation>> {
6,821✔
143
        let row_range = usize::try_from(row_range.start)
6,821✔
144
            .vortex_expect("Row range begin must fit within FlatLayout size")
6,821✔
145
            ..usize::try_from(row_range.end)
6,821✔
146
                .vortex_expect("Row range end must fit within FlatLayout size");
6,821✔
147
        Ok(Box::new(FlatEvaluation {
6,821✔
148
            name: self.name.clone(),
6,821✔
149
            array: self.array_future()?,
6,821✔
150
            row_range,
6,821✔
151
            expr: expr.clone(),
6,821✔
152
        }))
153
    }
6,821✔
154
}
155

156
struct FlatEvaluation {
157
    name: Arc<str>,
158
    array: SharedArrayFuture,
159
    row_range: Range<usize>,
160
    expr: ExprRef,
161
}
162

163
#[async_trait]
164
impl MaskEvaluation for FlatEvaluation {
165
    async fn invoke(&self, mask: Mask) -> VortexResult<Mask> {
1,898✔
166
        // TODO(ngates): if the mask density is low enough, or if the mask is dense within a range
167
        //  (as often happens with zone map pruning), then we could slice/filter the array prior
168
        //  to evaluating the expression.
169

170
        // Now we await the array .
171
        let mut array = self.array.clone().await?;
1,898✔
172

173
        // Slice the array based on the row mask.
174
        if self.row_range.start > 0 || self.row_range.end < array.len() {
1,898✔
175
            array = array.slice(self.row_range.start, self.row_range.end)?;
1,356✔
176
        }
542✔
177

178
        // TODO(ngates): the mask may actually be dense within a range, as is often the case when
179
        //  we have approximate mask results from a zone map. In which case we could look at
180
        //  the true_count between the mask's first and last true positions.
181
        // TODO(ngates): we could also track runtime statistics about whether it's worth selecting
182
        //   or not.
183
        let array_mask = if mask.density() < EXPR_EVAL_THRESHOLD {
1,898✔
184
            // Evaluate only the selected rows of the mask.
185
            array = filter(&array, &mask)?;
244✔
186
            let array_mask = Mask::try_from(self.expr.evaluate(&Scope::new(array))?.as_ref())?;
244✔
187
            mask.intersect_by_rank(&array_mask)
244✔
188
        } else {
189
            // Evaluate all rows, avoiding the more expensive rank intersection.
190
            array = self.expr.evaluate(&Scope::new(array))?;
1,654✔
191
            let array_mask = Mask::try_from(array.as_ref())?;
1,654✔
192
            mask.bitand(&array_mask)
1,654✔
193
        };
194

195
        log::debug!(
1,898✔
UNCOV
196
            "Flat mask evaluation {} - {} (mask = {}) => {}",
×
UNCOV
197
            self.name,
×
UNCOV
198
            self.expr,
×
199
            mask.density(),
×
200
            array_mask.density(),
×
201
        );
202

203
        Ok(array_mask)
1,898✔
204
    }
3,796✔
205
}
206

207
#[async_trait]
208
impl ArrayEvaluation for FlatEvaluation {
209
    async fn invoke(&self, mask: Mask) -> VortexResult<ArrayRef> {
6,821✔
210
        log::debug!(
6,821✔
UNCOV
211
            "Flat array evaluation {} - {} (mask = {})",
×
UNCOV
212
            self.name,
×
UNCOV
213
            self.expr,
×
214
            mask.density(),
×
215
        );
216

217
        // Now we await the array .
218
        let mut array = self.array.clone().await?;
6,821✔
219

220
        // Slice the array based on the row mask.
221
        if self.row_range.start > 0 || self.row_range.end < array.len() {
6,821✔
222
            array = array.slice(self.row_range.start, self.row_range.end)?;
3,489✔
223
        }
3,332✔
224

225
        // Filter the array based on the row mask.
226
        if !mask.all_true() {
6,821✔
227
            array = filter(&array, &mask)?;
3,152✔
228
        }
3,669✔
229

230
        // Evaluate the projection expression.
231
        if !is_root(&self.expr) {
6,821✔
232
            array = self.expr.evaluate(&Scope::new(array))?;
5,503✔
233
        }
1,318✔
234

235
        Ok(array)
6,821✔
236
    }
13,642✔
237
}
238

239
#[cfg(test)]
240
mod test {
241
    use std::sync::Arc;
242

243
    use arrow_buffer::BooleanBuffer;
244
    use futures::executor::block_on;
245
    use futures::stream;
246
    use vortex_array::arrays::PrimitiveArray;
247
    use vortex_array::validity::Validity;
248
    use vortex_array::{ArrayContext, ToCanonical};
249
    use vortex_buffer::buffer;
250
    use vortex_expr::{gt, lit, root};
251
    use vortex_mask::Mask;
252

253
    use crate::layouts::flat::writer::FlatLayoutStrategy;
254
    use crate::segments::{SegmentSource, SequenceWriter, TestSegments};
255
    use crate::sequence::SequenceId;
256
    use crate::{LayoutStrategy as _, SequentialStreamAdapter, SequentialStreamExt};
257

258
    #[test]
259
    fn flat_identity() {
1✔
260
        block_on(async {
1✔
261
            let ctx = ArrayContext::empty();
1✔
262
            let segments = TestSegments::default();
1✔
263
            let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
1✔
264
            let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid).to_array();
1✔
265
            let array_clone = array.clone();
1✔
266
            let layout = FlatLayoutStrategy::default()
1✔
267
                .write_stream(
1✔
268
                    &ctx,
1✔
269
                    sequence_writer.clone(),
1✔
270
                    SequentialStreamAdapter::new(
1✔
271
                        array.dtype().clone(),
1✔
272
                        stream::once(async { Ok((SequenceId::root().downgrade(), array_clone)) }),
1✔
273
                    )
1✔
274
                    .sendable(),
1✔
275
                )
1✔
276
                .await
1✔
277
                .unwrap();
1✔
278
            let segments: Arc<dyn SegmentSource> = Arc::new(segments);
1✔
279

280
            let result = layout
1✔
281
                .new_reader("".into(), segments)
1✔
282
                .unwrap()
1✔
283
                .projection_evaluation(&(0..layout.row_count()), &root())
1✔
284
                .unwrap()
1✔
285
                .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
1✔
286
                .await
1✔
287
                .unwrap()
1✔
288
                .to_primitive()
1✔
289
                .unwrap();
1✔
290

1✔
291
            assert_eq!(
1✔
292
                array.to_primitive().unwrap().as_slice::<i32>(),
1✔
293
                result.as_slice::<i32>()
1✔
294
            );
1✔
295
        })
1✔
296
    }
1✔
297

298
    #[test]
299
    fn flat_expr() {
1✔
300
        block_on(async {
1✔
301
            let ctx = ArrayContext::empty();
1✔
302
            let segments = TestSegments::default();
1✔
303
            let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
1✔
304
            let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid).to_array();
1✔
305
            let array_clone = array.clone();
1✔
306
            let layout = FlatLayoutStrategy::default()
1✔
307
                .write_stream(
1✔
308
                    &ctx,
1✔
309
                    sequence_writer.clone(),
1✔
310
                    SequentialStreamAdapter::new(
1✔
311
                        array.dtype().clone(),
1✔
312
                        stream::once(async { Ok((SequenceId::root().downgrade(), array_clone)) }),
1✔
313
                    )
1✔
314
                    .sendable(),
1✔
315
                )
1✔
316
                .await
1✔
317
                .unwrap();
1✔
318
            let segments: Arc<dyn SegmentSource> = Arc::new(segments);
1✔
319

1✔
320
            let expr = gt(root(), lit(3i32));
1✔
321
            let result = layout
1✔
322
                .new_reader("".into(), segments)
1✔
323
                .unwrap()
1✔
324
                .projection_evaluation(&(0..layout.row_count()), &expr)
1✔
325
                .unwrap()
1✔
326
                .invoke(Mask::new_true(layout.row_count().try_into().unwrap()))
1✔
327
                .await
1✔
328
                .unwrap()
1✔
329
                .to_bool()
1✔
330
                .unwrap();
1✔
331

1✔
332
            assert_eq!(
1✔
333
                &BooleanBuffer::from_iter([false, false, false, true, true]),
1✔
334
                result.boolean_buffer()
1✔
335
            );
1✔
336
        })
1✔
337
    }
1✔
338

339
    #[test]
340
    fn flat_unaligned_row_mask() {
1✔
341
        block_on(async {
1✔
342
            let ctx = ArrayContext::empty();
1✔
343
            let segments = TestSegments::default();
1✔
344
            let sequence_writer = SequenceWriter::new(Box::new(segments.clone()));
1✔
345
            let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid).to_array();
1✔
346
            let array_clone = array.clone();
1✔
347
            let layout = FlatLayoutStrategy::default()
1✔
348
                .write_stream(
1✔
349
                    &ctx,
1✔
350
                    sequence_writer.clone(),
1✔
351
                    SequentialStreamAdapter::new(
1✔
352
                        array.dtype().clone(),
1✔
353
                        stream::once(async { Ok((SequenceId::root().downgrade(), array_clone)) }),
1✔
354
                    )
1✔
355
                    .sendable(),
1✔
356
                )
1✔
357
                .await
1✔
358
                .unwrap();
1✔
359
            let segments: Arc<dyn SegmentSource> = Arc::new(segments);
1✔
360

361
            let result = layout
1✔
362
                .new_reader("".into(), segments)
1✔
363
                .unwrap()
1✔
364
                .projection_evaluation(&(2..4), &root())
1✔
365
                .unwrap()
1✔
366
                .invoke(Mask::new_true(2))
1✔
367
                .await
1✔
368
                .unwrap()
1✔
369
                .to_primitive()
1✔
370
                .unwrap();
1✔
371

1✔
372
            assert_eq!(result.as_slice::<i32>(), &[3, 4],);
1✔
373
        })
1✔
374
    }
1✔
375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc