• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16969983197

14 Aug 2025 03:45PM UTC coverage: 85.882% (-1.8%) from 87.693%
16969983197

Pull #4215

github

web-flow
Merge 6636736da into f547cbca5
Pull Request #4215: Ji/vectors

80 of 1729 new or added lines in 38 files covered. (4.63%)

117 existing lines in 25 files now uncovered.

56994 of 66363 relevant lines covered (85.88%)

609331.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-array/src/pipeline/query/mod.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
mod buffers;
5
mod dag;
6
mod operators;
7
mod toposort;
8

9
use std::ops::DerefMut;
10
use std::task::Poll;
11

12
use vortex_buffer::ByteBuffer;
13
use vortex_error::{VortexError, VortexResult};
14

15
use crate::pipeline::bits::BitView;
16
use crate::pipeline::buffers::BufferId;
17
use crate::pipeline::operators::Operator;
18
use crate::pipeline::query::buffers::{OutputTarget, VectorAllocationPlan};
19
use crate::pipeline::query::dag::DagNode;
20
use crate::pipeline::vector::{VectorId, VectorRef};
21
use crate::pipeline::view::ViewMut;
22
use crate::pipeline::{Kernel, KernelContext};
23

24
/// The idea of a pipeline is to orchestrate driving a set of operators to completion with
25
/// fully optimized resource usage.
26
///
27
/// During construction, the plan is analyzed to determine the optimal way to execute the nodes.
28
/// This includes:
29
/// - Sub-expression elimination: Identifying common sub-expressions and reusing them.
30
/// - Vector allocation: Determining how many intermediate vectors are needed.
31
/// - Buffer management: Managing the buffers that hold the data for each node.
32
pub struct Pipeline<'a> {
33
    /// Nodes in the DAG representing the execution plan with common sub-expressions eliminated.
34
    dag: Vec<DagNode<'a>>,
35
    /// The index into the `dag` of the root node (the entry point for execution).
36
    dag_root: usize,
37

38
    /// The topological order of `dag` nodes for execution.
39
    execution_order: Vec<usize>,
40
    /// The leaf nodes of the plan (nodes with no children).
41
    leaf_nodes: Vec<usize>,
42
    /// The operators bound to each node in the DAG.
43
    operators: Vec<Box<dyn Kernel>>,
44
    /// The allocation plan for vectors used by the pipeline.
45
    allocation_plan: VectorAllocationPlan,
46

47
    /// The current state of each node in the DAG, indexed by position in `dag`.
48
    node_states: Vec<NodeState>,
49

50
    // Pre-allocated work lists (sized to max possible nodes)
51
    /// The current stack of nodes to execute.
52
    work_stack: Vec<usize>,
53
    /// Nodes that returned pending during the last step.
54
    pending_nodes: Vec<usize>,
55
    /// A scratch list for pending nodes that we flip-flop with `pending_set` to avoid allocations.
56
    pending_nodes_next: Vec<usize>,
57
}
58

59
impl<'a> Pipeline<'a> {
60
    // TODO(ngates): can we pass the mask in here such that the plan can replace empty nodes?
NEW
61
    pub fn new(plan: &'a dyn Operator) -> VortexResult<Self> {
×
62
        // Step 1: Convert the plan tree to a DAG by eliminating common sub-expressions.
NEW
63
        let (dag_root, dag) = Self::build_dag(plan)?;
×
NEW
64
        let node_count = dag.len();
×
65

66
        // Step 2: Determine execution order (topological sort)
NEW
67
        let execution_order = Self::topological_sort(&dag)?;
×
NEW
68
        let leaf_nodes = Self::leaf_nodes(&dag);
×
69

70
        // Step 3: Allocate vectors
NEW
71
        let allocation_plan = Self::allocate_vectors(dag_root, &dag, &execution_order)?;
×
72

73
        // let (buffer_slots, buffers) = Self::allocate_buffers(&dag, &execution_order)?;
74

75
        // Construct the operators, binding their inputs using the allocation plan.
NEW
76
        let operators = Self::bind_operators(&dag, &allocation_plan)?;
×
77

NEW
78
        Ok(Self {
×
NEW
79
            dag,
×
NEW
80
            dag_root,
×
NEW
81
            execution_order,
×
NEW
82
            leaf_nodes,
×
NEW
83
            operators,
×
NEW
84
            allocation_plan,
×
NEW
85
            node_states: vec![NodeState::NotStarted; node_count],
×
NEW
86
            // Pre-allocate work arrays
×
NEW
87
            work_stack: Vec::with_capacity(node_count),
×
NEW
88
            pending_nodes: Vec::with_capacity(node_count),
×
NEW
89
            pending_nodes_next: Vec::with_capacity(node_count),
×
NEW
90
        })
×
NEW
91
    }
×
92

93
    /// Step the pipeline forward
NEW
94
    pub fn step(&mut self, selected: BitView, out: &mut ViewMut) -> Poll<VortexResult<()>> {
×
NEW
95
        self.work_stack.clear();
×
NEW
96
        self.pending_nodes_next.clear();
×
97

98
        // Start with leaf nodes
NEW
99
        self.work_stack.extend(
×
NEW
100
            self.leaf_nodes
×
NEW
101
                .iter()
×
NEW
102
                .filter(|&&idx| self.node_states[idx] == NodeState::NotStarted)
×
NEW
103
                .copied(),
×
104
        );
105

106
        loop {
107
            // Retry pending nodes first
NEW
108
            while let Some(node_idx) = self.pending_nodes.pop() {
×
NEW
109
                match self.try_execute_node(node_idx, selected, out) {
×
NEW
110
                    ExecutionResult::Completed => {
×
NEW
111
                        // Add ready parents for cache locality
×
NEW
112
                        self.push_ready_parents(node_idx);
×
NEW
113
                    }
×
NEW
114
                    ExecutionResult::Pending => {
×
NEW
115
                        // Keep in pending set
×
NEW
116
                        self.pending_nodes_next.push(node_idx);
×
NEW
117
                    }
×
NEW
118
                    ExecutionResult::Error(e) => return Poll::Ready(Err(e)),
×
NEW
119
                    ExecutionResult::NotReady => {
×
NEW
120
                        // Dependencies not ready, skip
×
NEW
121
                    }
×
122
                }
123
            }
124

NEW
125
            std::mem::swap(&mut self.pending_nodes, &mut self.pending_nodes_next);
×
NEW
126
            self.pending_nodes_next.clear();
×
127

128
            // Process work stack
NEW
129
            if let Some(node_idx) = self.work_stack.pop() {
×
NEW
130
                match self.try_execute_node(node_idx, selected, out) {
×
NEW
131
                    ExecutionResult::Completed => {
×
NEW
132
                        // Execute entire parent chain for maximum cache locality
×
NEW
133
                        self.execute_parent_chain(node_idx, selected, out);
×
NEW
134
                    }
×
NEW
135
                    ExecutionResult::Pending => {
×
NEW
136
                        self.pending_nodes.push(node_idx);
×
NEW
137
                    }
×
NEW
138
                    ExecutionResult::Error(e) => return Poll::Ready(Err(e)),
×
NEW
139
                    ExecutionResult::NotReady => {}
×
140
                }
NEW
141
            } else if self.pending_nodes.is_empty() {
×
NEW
142
                break;
×
NEW
143
            }
×
144
        }
145

NEW
146
        if !self.pending_nodes.is_empty() {
×
NEW
147
            Poll::Pending
×
NEW
148
        } else if self.node_states[self.dag_root] == NodeState::Completed {
×
NEW
149
            self.reset_step();
×
NEW
150
            Poll::Ready(Ok(()))
×
151
        } else {
NEW
152
            Poll::Ready(Ok(()))
×
153
        }
NEW
154
    }
×
155

156
    /// Execute chain of ready parents while data is in cache
157
    #[inline]
NEW
158
    fn execute_parent_chain(&mut self, mut node_idx: usize, selected: BitView, out: &mut ViewMut) {
×
159
        loop {
160
            // Find a ready parent
NEW
161
            let ready_parent = self.find_ready_parent(node_idx);
×
162

NEW
163
            match ready_parent {
×
NEW
164
                Some(parent_idx) => {
×
NEW
165
                    match self.try_execute_node(parent_idx, selected, out) {
×
NEW
166
                        ExecutionResult::Completed => {
×
NEW
167
                            // Continue up the chain
×
NEW
168
                            node_idx = parent_idx;
×
NEW
169
                        }
×
170
                        ExecutionResult::Pending => {
NEW
171
                            self.pending_nodes.push(parent_idx);
×
NEW
172
                            break;
×
173
                        }
NEW
174
                        ExecutionResult::Error(_) | ExecutionResult::NotReady => break,
×
175
                    }
176
                }
177
                None => {
178
                    // No ready parent, check for other ready parents to queue
NEW
179
                    self.push_ready_parents(node_idx);
×
NEW
180
                    break;
×
181
                }
182
            }
183
        }
NEW
184
    }
×
185

186
    /// Find a single ready parent (for chain execution)
187
    #[inline]
NEW
188
    fn find_ready_parent(&self, node_idx: usize) -> Option<usize> {
×
NEW
189
        let node = &self.dag[node_idx];
×
190

NEW
191
        node.parents.iter().copied().find(|&parent_idx| {
×
NEW
192
            if self.node_states[parent_idx] != NodeState::NotStarted {
×
NEW
193
                return false;
×
NEW
194
            }
×
195

NEW
196
            let parent = &self.dag[parent_idx];
×
NEW
197
            parent
×
NEW
198
                .children
×
NEW
199
                .iter()
×
NEW
200
                .all(|&child| self.node_states[child] == NodeState::Completed)
×
NEW
201
        })
×
NEW
202
    }
×
203

204
    /// Push ready parents to work stack (no allocation - capacity pre-allocated)
205
    #[inline]
NEW
206
    fn push_ready_parents(&mut self, completed_node: usize) {
×
NEW
207
        let node = &self.dag[completed_node];
×
208

NEW
209
        for &parent_idx in &node.parents {
×
210
            // Skip if already processed
NEW
211
            if self.node_states[parent_idx] != NodeState::NotStarted {
×
NEW
212
                continue;
×
NEW
213
            }
×
214

215
            // Check if all children completed
NEW
216
            let parent = &self.dag[parent_idx];
×
NEW
217
            let all_children_done = parent
×
NEW
218
                .children
×
NEW
219
                .iter()
×
NEW
220
                .all(|&child| self.node_states[child] == NodeState::Completed);
×
221

NEW
222
            if all_children_done {
×
NEW
223
                // Push to work stack - won't allocate due to capacity
×
NEW
224
                self.work_stack.push(parent_idx);
×
NEW
225
            }
×
226
        }
NEW
227
    }
×
228

229
    /// Try to execute a node if ready
230
    #[inline]
NEW
231
    fn try_execute_node(
×
NEW
232
        &mut self,
×
NEW
233
        node_idx: usize,
×
NEW
234
        selected: BitView,
×
NEW
235
        out: &mut ViewMut,
×
NEW
236
    ) -> ExecutionResult {
×
237
        // Check current state
NEW
238
        match self.node_states[node_idx] {
×
NEW
239
            NodeState::Completed => return ExecutionResult::Completed,
×
NEW
240
            NodeState::Executing | NodeState::Pending => {
×
NEW
241
                // Try to continue execution
×
NEW
242
            }
×
243
            NodeState::NotStarted => {
244
                // Check if dependencies are ready
245
                // FIXME(ngates): is this ever not true?
NEW
246
                let node = &self.dag[node_idx];
×
NEW
247
                let ready = node
×
NEW
248
                    .children
×
NEW
249
                    .iter()
×
NEW
250
                    .all(|&child| self.node_states[child] == NodeState::Completed);
×
NEW
251
                if !ready {
×
NEW
252
                    return ExecutionResult::NotReady;
×
NEW
253
                }
×
254

NEW
255
                self.node_states[node_idx] = NodeState::Executing;
×
256
            }
257
        }
258

259
        // Execute the node
NEW
260
        match self.execute_single_node(node_idx, selected, out) {
×
261
            Poll::Ready(Ok(())) => {
NEW
262
                self.node_states[node_idx] = NodeState::Completed;
×
NEW
263
                ExecutionResult::Completed
×
264
            }
265
            Poll::Pending => {
NEW
266
                self.node_states[node_idx] = NodeState::Pending;
×
NEW
267
                ExecutionResult::Pending
×
268
            }
NEW
269
            Poll::Ready(Err(e)) => ExecutionResult::Error(e),
×
270
        }
NEW
271
    }
×
272

273
    /// Execute a single node
274
    #[inline]
NEW
275
    fn execute_single_node(
×
NEW
276
        &mut self,
×
NEW
277
        node_idx: usize,
×
NEW
278
        selected: BitView,
×
NEW
279
        external_out: &mut ViewMut,
×
NEW
280
    ) -> Poll<VortexResult<()>> {
×
NEW
281
        let operator = self.operators[node_idx].as_mut();
×
282

NEW
283
        let ctx = Context {
×
NEW
284
            allocation_plan: &self.allocation_plan,
×
NEW
285
        };
×
286

287
        // FIXME(ngates): should we reset the output vector selection?
288

NEW
289
        match self.allocation_plan.output_targets[node_idx] {
×
NEW
290
            OutputTarget::ExternalOutput => operator.step(&ctx, selected, external_out),
×
NEW
291
            OutputTarget::IntermediateVector(vector_idx) | OutputTarget::InPlace(_, vector_idx) => {
×
NEW
292
                let mut vector_ref = self.allocation_plan.vectors[vector_idx].borrow_mut();
×
NEW
293
                let result = {
×
NEW
294
                    let mut view = vector_ref.as_view_mut();
×
NEW
295
                    operator.step(&ctx, selected, &mut view)
×
296
                };
NEW
297
                vector_ref.deref_mut().set_len(selected.true_count());
×
NEW
298
                result
×
299
            }
300
        }
NEW
301
    }
×
302

303
    /// Reset state for next pipeline step
304
    #[inline]
NEW
305
    fn reset_step(&mut self) {
×
306
        // Reset all node states
NEW
307
        self.node_states.fill(NodeState::NotStarted);
×
308

309
        // Clear work lists (doesn't deallocate)
NEW
310
        self.work_stack.clear();
×
NEW
311
        self.pending_nodes.clear();
×
NEW
312
        self.pending_nodes_next.clear();
×
NEW
313
    }
×
314
}
315

316
/// Execution state for a node
317
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318
pub(in crate::pipeline) enum NodeState {
319
    /// Node has not been executed yet
320
    NotStarted,
321
    /// Node is currently executing (may return Poll::Pending)
322
    Executing,
323
    /// Node is waiting for external resources (e.g. buffers) to become available
324
    Pending,
325
    /// Node has completed execution
326
    Completed,
327
}
328

329
enum ExecutionResult {
330
    Completed,
331
    Pending,
332
    NotReady,
333
    Error(VortexError),
334
}
335

336
/// FIXME(ngates): this is a hack for testing
337
impl Kernel for Pipeline<'_> {
NEW
338
    fn seek(&mut self, chunk_idx: usize) -> VortexResult<()> {
×
NEW
339
        todo!()
×
340
    }
341

NEW
342
    fn step(
×
NEW
343
        &mut self,
×
NEW
344
        ctx: &dyn KernelContext,
×
NEW
345
        selected: BitView,
×
NEW
346
        out: &mut ViewMut,
×
NEW
347
    ) -> Poll<VortexResult<()>> {
×
NEW
348
        Pipeline::step(self, selected, out)
×
NEW
349
    }
×
350
}
351

352
struct Context<'a> {
353
    allocation_plan: &'a VectorAllocationPlan,
354
}
355

356
impl<'a> KernelContext for Context<'a> {
NEW
357
    fn buffer(&self, _buffer_id: BufferId) -> Poll<VortexResult<ByteBuffer>> {
×
NEW
358
        todo!()
×
359
    }
360

NEW
361
    fn vector(&self, vector_id: VectorId) -> VectorRef<'_> {
×
NEW
362
        VectorRef::new(self.allocation_plan.vectors[*vector_id].borrow())
×
NEW
363
    }
×
364
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc