• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16970635821

14 Aug 2025 04:13PM UTC coverage: 85.882% (-1.8%) from 87.693%
16970635821

Pull #4215

github

web-flow
Merge 5182504a6 into f547cbca5
Pull Request #4215: Ji/vectors

80 of 1729 new or added lines in 38 files covered. (4.63%)

117 existing lines in 25 files now uncovered.

56994 of 66363 relevant lines covered (85.88%)

609331.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-array/src/pipeline/mod.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
//! This module contains experiments into pipelined data processing within Vortex.
5
//!
6
//! Arrays (and eventually Layouts) will be convertible into a [`Kernel`] that can then be
7
//! exported into a [`ViewMut`] one chunk of [`N`] elements at a time. This allows us to keep
8
//! compute largely within the L1 cache, as well as to write out canonical data into externally
9
//! provided buffers.
10
//!
11
//! Each chunk is represented in a canonical physical form, as determined by the logical
12
//! [`vortex_dtype::DType`] of the array. This provides a predicate base on which to perform
13
//! compute. Unlike DuckDB and other vectorized systems, we force a single canonical representation
14
//! instead of supporting multiple encodings because compute push-down is applied a priori to the
15
//! logical representation.
16
//!
17
//! It is a work-in-progress and is not yet used in production.
18

19
pub mod bits;
20
pub mod buffers;
21
pub mod canonical;
22
pub mod operators;
23
pub mod query;
24
pub mod selection;
25
pub mod types;
26
pub mod vector;
27
pub mod view;
28

29
/// The number of elements in each step of a Vortex evaluation pipeline.
30
pub const N: usize = 1024;
31

32
use std::ops::Range;
33
use std::task::Poll;
34

35
use vector::{VectorId, VectorRef};
36
use vortex_buffer::ByteBuffer;
37
use vortex_error::{VortexResult, vortex_err, vortex_panic};
38

39
use crate::pipeline::bits::BitView;
40
use crate::pipeline::buffers::BufferId;
41
use crate::pipeline::view::ViewMut;
42

43
/// A pipeline provides a push-based way to emit a stream of canonical data.
44
///
45
/// By passing multiple vector computations through the same pipeline, we can amortize
46
/// the setup costs (such as DType validation, stats short-circuiting, etc.), and to make better
47
/// use of CPU caches by performing all operations while the data is hot.
48
///
49
/// By passing a mask into the `step` function, we give encodings visibility into the data that
50
/// will be read by their parents. Some encodings may choose to decode all `N` elements, and then
51
/// set the given selection mask on the output vector. Other encodings may choose to only unpack
52
/// the selected elements.
53
///
54
/// We are considering further adding a `defined` parameter that indicates which elements are
55
/// defined and will be interpreted by the parent. This differs from masking, in that undefined
56
/// elements should still live in the correct location, it just doesn't matter what their value
57
/// is. This will allow, e.g. a validity encoding to tell its children that the values in certain
58
/// positions are going to be masked out anyway, so don't bother doing any expensive compute.
59
pub trait Kernel {
60
    /// Seek the pipeline to a specific chunk offset.
61
    ///
62
    /// i.e. the resulting row offset is `idx * N`, where `N` is the number of elements in a chunk.
63
    ///
64
    /// The reason for a separate seek function (vs passing an offset directly to `step`) is that
65
    /// it allows the pipeline to optimize for sequential access patterns, which is common in
66
    /// many encodings. For example, a run-length encoding can efficiently seek to the start of a
67
    /// chunk without needing to perform a full binary search of the ends in each step.
68
    // TODO(ngates): should this be `skip(n)` instead? Depends if we want to support going
69
    //  backwards?
70
    fn seek(&mut self, chunk_idx: usize) -> VortexResult<()>;
71

72
    /// Attempts to perform a single step of the pipeline, writing data to the output vector.
73
    /// Returns `Poll::Done` if the pipeline is complete, or `Poll::Pending` if buffers are
74
    /// required to continue.
75
    ///
76
    /// The `selected` parameter defines which elements of the chunk should be exported, where
77
    /// `None` indicates that all elements are selected.
78
    ///
79
    // TODO(ngates): we could introduce a `defined` parameter to indicate which elements are
80
    //  defined and will be interpreted by the parent. This would allow us to skip writing
81
    //  elements that are not defined, for example if the parent is a dense null validity encoding.
82
    fn step(
83
        &mut self,
84
        ctx: &dyn KernelContext,
85
        selected: BitView,
86
        out: &mut ViewMut,
87
    ) -> Poll<VortexResult<()>>;
88
}
89

90
pub trait KernelExt: Kernel {
91
    /// Perform a single step of the pipeline, panics if the step returns [`Poll::Pending`].
NEW
92
    fn step_now(
×
NEW
93
        &mut self,
×
NEW
94
        ctx: &dyn KernelContext,
×
NEW
95
        selected: BitView,
×
NEW
96
        out: &mut ViewMut,
×
NEW
97
    ) -> VortexResult<()> {
×
NEW
98
        match self.step(ctx, selected, out) {
×
NEW
99
            Poll::Ready(r) => r,
×
100
            Poll::Pending => {
NEW
101
                vortex_panic!("Pipeline step is pending, but expected it to be ready.")
×
102
            }
103
        }
NEW
104
    }
×
105
}
106

107
impl<K: Kernel + ?Sized> KernelExt for K {}
108

109
pub trait KernelContext {
110
    /// Get a vector by its ID.
111
    fn vector(&self, vector_id: VectorId) -> VectorRef<'_>;
112

113
    /// Get a buffer by its ID.
114
    fn buffer(&self, buffer_id: BufferId) -> Poll<VortexResult<ByteBuffer>>;
115

116
    /// Pre-fetch buffers for future use (non-blocking hint).
NEW
117
    fn prefetch(&self, buffer_ids: &[BufferId]) {
×
NEW
118
        for &buffer_id in buffer_ids {
×
NEW
119
            let _ = self.buffer(buffer_id);
×
NEW
120
        }
×
NEW
121
    }
×
122

123
    /// Request a range of data from a buffer (for partial reads).
NEW
124
    fn buffer_range(
×
NEW
125
        &self,
×
NEW
126
        buffer_id: BufferId,
×
NEW
127
        range: Range<usize>,
×
NEW
128
    ) -> Poll<VortexResult<ByteBuffer>> {
×
NEW
129
        match self.buffer(buffer_id) {
×
NEW
130
            Poll::Ready(Ok(buffer)) => {
×
NEW
131
                let start = range.start;
×
NEW
132
                let end = range.end;
×
NEW
133
                if start < end && end <= buffer.len() {
×
NEW
134
                    Poll::Ready(Ok(buffer.slice(start..end)))
×
135
                } else {
NEW
136
                    Poll::Ready(Err(vortex_err!(
×
NEW
137
                        "Invalid range for buffer: {}..{}",
×
NEW
138
                        start,
×
NEW
139
                        end
×
NEW
140
                    )))
×
141
                }
142
            }
NEW
143
            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
×
NEW
144
            Poll::Pending => Poll::Pending,
×
145
        }
NEW
146
    }
×
147
}
148

149
impl KernelContext for () {
NEW
150
    fn vector(&self, vector_id: VectorId) -> VectorRef<'_> {
×
NEW
151
        todo!()
×
152
    }
153

NEW
154
    fn buffer(&self, buffer_id: BufferId) -> Poll<VortexResult<ByteBuffer>> {
×
NEW
155
        todo!()
×
156
    }
157
}
158

159
#[cfg(test)]
160
mod tests {
161
    use arrow_buffer::BooleanBuffer;
162
    use rand::prelude::StdRng;
163
    use rand::{Rng, SeedableRng};
164
    use vortex_buffer::BufferMut;
165
    use vortex_dtype::{DType, Nullability};
166
    use vortex_mask::Mask;
167

168
    use crate::canonical::ToCanonical;
169
    use crate::compute::Operator;
170
    use crate::pipeline::canonical::export_canonical_pipeline_expr;
171
    use crate::pipeline::operators::compare::CompareOperator;
172
    use crate::{Array, IntoArray};
173

174
    #[test]
175
    fn test_pipeline_with_comparison() {
176
        // Create test data
177
        let mut rng = StdRng::seed_from_u64(42);
178
        let values = (0..1000)
179
            .map(|_| rng.random_range(0i32..100))
180
            .collect::<BufferMut<i32>>()
181
            .into_array()
182
            .to_primitive()
183
            .unwrap();
184

185
        // Create a mask that selects ~50% of elements
186
        let mask_bools: Vec<bool> = (0..1000).map(|_| rng.random_bool(0.5)).collect();
187
        let mask = Mask::from_buffer(BooleanBuffer::from_iter(mask_bools));
188

189
        // Create a pipeline with comparison: array > array (self-comparison)
190
        let expr1 = values.to_pipeline_plan().unwrap();
191
        let expr2 = values.to_pipeline_plan().unwrap();
192
        let compare_expr = CompareOperator::new(expr1, expr2, Operator::Gt);
193

194
        // Execute the pipeline
195
        let result = export_canonical_pipeline_expr(
196
            &DType::Bool(Nullability::NonNullable),
197
            values.len(),
198
            &compare_expr,
199
            &mask,
200
        )
201
        .unwrap();
202

203
        // Verify the result
204
        assert!(matches!(result, crate::Canonical::Bool(_)));
205
        if let crate::Canonical::Bool(bool_array) = result {
206
            // Since we're comparing array > array (same values), all results should be false
207
            let expected_len = mask.true_count();
208
            assert_eq!(bool_array.len(), expected_len);
209

210
            // All values should be false since we're comparing identical values
211
            let bool_buffer = bool_array.boolean_buffer();
212
            assert_eq!(
213
                bool_buffer.count_set_bits(),
214
                0,
215
                "All comparisons should be false"
216
            );
217
        }
218
    }
219

220
    #[test]
221
    fn test_pipeline_with_different_arrays_comparison() {
222
        // Create test data with known pattern
223
        let values1 = (0..1000)
224
            .map(|i| (i % 100))
225
            .collect::<BufferMut<i32>>()
226
            .into_array()
227
            .to_primitive()
228
            .unwrap();
229
        let values2 = (0..1000)
230
            .map(|i| ((i + 1) % 100))
231
            .collect::<BufferMut<i32>>()
232
            .into_array()
233
            .to_primitive()
234
            .unwrap();
235

236
        // Select all elements
237
        let mask = Mask::from_buffer(BooleanBuffer::new_set(1000));
238

239
        // Create pipeline: array1 < array2
240
        let expr1 = values1.to_pipeline_plan().unwrap();
241
        let expr2 = values2.to_pipeline_plan().unwrap();
242
        let compare_expr = CompareOperator::new(expr1, expr2, Operator::Lt);
243

244
        // Execute the pipeline
245
        let result = export_canonical_pipeline_expr(
246
            &DType::Bool(Nullability::NonNullable),
247
            1000,
248
            &compare_expr,
249
            &mask,
250
        )
251
        .unwrap();
252

253
        // Verify the result
254
        assert!(matches!(result, crate::Canonical::Bool(_)));
255
        if let crate::Canonical::Bool(bool_array) = result {
256
            assert_eq!(bool_array.len(), 1000);
257

258
            // Most comparisons should be true (except when values wrap around)
259
            let bool_buffer = bool_array.boolean_buffer();
260
            let true_count = bool_buffer.count_set_bits();
261

262
            // Should be approximately 990 true values (10 false when wrapping from 99 to 0)
263
            assert!(
264
                true_count > 980,
265
                "Expected most comparisons to be true, got {}",
266
                true_count
267
            );
268
            assert!(
269
                true_count < 1000,
270
                "Expected some comparisons to be false due to wraparound"
271
            );
272
        }
273
    }
274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc