• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 17040114484

18 Aug 2025 12:07PM UTC coverage: 86.042% (-1.9%) from 87.913%
17040114484

Pull #4215

github

web-flow
Merge 4600ca5c4 into cb1a92920
Pull Request #4215: Ji/vectors

132 of 1817 new or added lines in 42 files covered. (7.26%)

125 existing lines in 26 files now uncovered.

56932 of 66168 relevant lines covered (86.04%)

611735.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-vector/src/query/mod.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
mod buffers;
5
mod dag;
6
mod operators;
7
mod toposort;
8

9
use std::ops::DerefMut;
10
use std::task::Poll;
11

12
use vortex_buffer::ByteBuffer;
13
use vortex_error::{VortexError, VortexResult};
14

15
use crate::bits::BitView;
16
use crate::buffers::BufferId;
17
use crate::operators::Operator;
18
use crate::query::buffers::{OutputTarget, VectorAllocationPlan};
19
use crate::query::dag::DagNode;
20
use crate::vector::{VectorId, VectorRef};
21
use crate::view::ViewMut;
22
use crate::{Kernel, KernelContext};
23

24
/// The idea of a pipeline is to orchestrate driving a set of operators to completion with
25
/// fully optimized resource usage.
26
///
27
/// During construction, the plan is analyzed to determine the optimal way to execute the nodes.
28
/// This includes:
29
/// - Sub-expression elimination: Identifying common sub-expressions and reusing them.
30
/// - Vector allocation: Determining how many intermediate vectors are needed.
31
/// - Buffer management: Managing the buffers that hold the data for each node.
32
pub struct Pipeline<'a> {
33
    /// Nodes in the DAG representing the execution plan with common sub-expressions eliminated.
34
    dag: Vec<DagNode<'a>>,
35
    /// The index into the `dag` of the root node (the entry point for execution).
36
    dag_root: usize,
37

38
    /// The topological order of `dag` nodes for execution.
39
    execution_order: Vec<usize>,
40
    /// The leaf nodes of the plan (nodes with no children).
41
    leaf_nodes: Vec<usize>,
42
    /// The operators bound to each node in the DAG.
43
    operators: Vec<Box<dyn Kernel>>,
44
    /// The allocation plan for vectors used by the pipeline.
45
    allocation_plan: VectorAllocationPlan,
46

47
    /// The current state of each node in the DAG, indexed by position in `dag`.
48
    node_states: Vec<NodeState>,
49

50
    // Pre-allocated work lists (sized to max possible nodes)
51
    /// The current stack of nodes to execute.
52
    work_stack: Vec<usize>,
53
    /// Nodes that returned pending during the last step.
54
    pending_nodes: Vec<usize>,
55
    /// A scratch list for pending nodes that we flip-flop with `pending_set` to avoid allocations.
56
    pending_nodes_next: Vec<usize>,
57
}
58

59
impl<'a> Pipeline<'a> {
60
    // TODO(ngates): can we pass the mask in here such that the plan can replace empty nodes?
NEW
61
    pub fn new(plan: &'a dyn Operator) -> VortexResult<Self> {
×
62
        // Step 1: Convert the plan tree to a DAG by eliminating common sub-expressions.
NEW
63
        let (dag_root, dag) = Self::build_dag(plan)?;
×
NEW
64
        let node_count = dag.len();
×
65

66
        // Step 2: Determine execution order (topological sort)
NEW
67
        let execution_order = Self::topological_sort(&dag)?;
×
NEW
68
        let leaf_nodes = Self::leaf_nodes(&dag);
×
69

70
        // Step 3: Allocate vectors
NEW
71
        let allocation_plan = Self::allocate_vectors(dag_root, &dag, &execution_order)?;
×
72

73
        // let (buffer_slots, buffers) = Self::allocate_buffers(&dag, &execution_order)?;
74

75
        // Construct the operators, binding their inputs using the allocation plan.
NEW
76
        let operators = Self::bind_operators(&dag, &allocation_plan)?;
×
77

NEW
78
        Ok(Self {
×
NEW
79
            dag,
×
NEW
80
            dag_root,
×
NEW
81
            execution_order,
×
NEW
82
            leaf_nodes,
×
NEW
83
            operators,
×
NEW
84
            allocation_plan,
×
NEW
85
            node_states: vec![NodeState::NotStarted; node_count],
×
NEW
86
            // Pre-allocate work arrays
×
NEW
87
            work_stack: Vec::with_capacity(node_count),
×
NEW
88
            pending_nodes: Vec::with_capacity(node_count),
×
NEW
89
            pending_nodes_next: Vec::with_capacity(node_count),
×
NEW
90
        })
×
NEW
91
    }
×
92

NEW
93
    pub fn seek(&mut self, chunk_idx: usize) -> VortexResult<()> {
×
NEW
94
        self.operators
×
NEW
95
            .iter_mut()
×
NEW
96
            .try_for_each(|op| op.seek(chunk_idx))
×
NEW
97
    }
×
98

99
    /// Step the pipeline forward
NEW
100
    pub fn step(&mut self, selected: BitView, out: &mut ViewMut) -> Poll<VortexResult<()>> {
×
NEW
101
        self.work_stack.clear();
×
NEW
102
        self.pending_nodes_next.clear();
×
103

104
        // Start with leaf nodes
NEW
105
        self.work_stack.extend(
×
NEW
106
            self.leaf_nodes
×
NEW
107
                .iter()
×
NEW
108
                .filter(|&&idx| self.node_states[idx] == NodeState::NotStarted)
×
NEW
109
                .copied(),
×
110
        );
111

112
        loop {
113
            // Retry pending nodes first
NEW
114
            while let Some(node_idx) = self.pending_nodes.pop() {
×
NEW
115
                match self.try_execute_node(node_idx, selected, out) {
×
NEW
116
                    ExecutionResult::Completed => {
×
NEW
117
                        // Add ready parents for cache locality
×
NEW
118
                        self.push_ready_parents(node_idx);
×
NEW
119
                    }
×
NEW
120
                    ExecutionResult::Pending => {
×
NEW
121
                        // Keep in pending set
×
NEW
122
                        self.pending_nodes_next.push(node_idx);
×
NEW
123
                    }
×
NEW
124
                    ExecutionResult::Error(e) => return Poll::Ready(Err(e)),
×
NEW
125
                    ExecutionResult::NotReady => {
×
NEW
126
                        // Dependencies not ready, skip
×
NEW
127
                    }
×
128
                }
129
            }
130

NEW
131
            std::mem::swap(&mut self.pending_nodes, &mut self.pending_nodes_next);
×
NEW
132
            self.pending_nodes_next.clear();
×
133

134
            // Process work stack
NEW
135
            if let Some(node_idx) = self.work_stack.pop() {
×
NEW
136
                match self.try_execute_node(node_idx, selected, out) {
×
NEW
137
                    ExecutionResult::Completed => {
×
NEW
138
                        // Execute entire parent chain for maximum cache locality
×
NEW
139
                        self.execute_parent_chain(node_idx, selected, out);
×
NEW
140
                    }
×
NEW
141
                    ExecutionResult::Pending => {
×
NEW
142
                        self.pending_nodes.push(node_idx);
×
NEW
143
                    }
×
NEW
144
                    ExecutionResult::Error(e) => return Poll::Ready(Err(e)),
×
NEW
145
                    ExecutionResult::NotReady => {}
×
146
                }
NEW
147
            } else if self.pending_nodes.is_empty() {
×
NEW
148
                break;
×
NEW
149
            }
×
150
        }
151

NEW
152
        if !self.pending_nodes.is_empty() {
×
NEW
153
            Poll::Pending
×
NEW
154
        } else if self.node_states[self.dag_root] == NodeState::Completed {
×
NEW
155
            self.reset_step();
×
NEW
156
            Poll::Ready(Ok(()))
×
157
        } else {
NEW
158
            Poll::Ready(Ok(()))
×
159
        }
NEW
160
    }
×
161

162
    /// Execute chain of ready parents while data is in cache
163
    #[inline]
NEW
164
    fn execute_parent_chain(&mut self, mut node_idx: usize, selected: BitView, out: &mut ViewMut) {
×
165
        loop {
166
            // Find a ready parent
NEW
167
            let ready_parent = self.find_ready_parent(node_idx);
×
168

NEW
169
            match ready_parent {
×
NEW
170
                Some(parent_idx) => {
×
NEW
171
                    match self.try_execute_node(parent_idx, selected, out) {
×
NEW
172
                        ExecutionResult::Completed => {
×
NEW
173
                            // Continue up the chain
×
NEW
174
                            node_idx = parent_idx;
×
NEW
175
                        }
×
176
                        ExecutionResult::Pending => {
NEW
177
                            self.pending_nodes.push(parent_idx);
×
NEW
178
                            break;
×
179
                        }
NEW
180
                        ExecutionResult::Error(_) | ExecutionResult::NotReady => break,
×
181
                    }
182
                }
183
                None => {
184
                    // No ready parent, check for other ready parents to queue
NEW
185
                    self.push_ready_parents(node_idx);
×
NEW
186
                    break;
×
187
                }
188
            }
189
        }
NEW
190
    }
×
191

192
    /// Find a single ready parent (for chain execution)
193
    #[inline]
NEW
194
    fn find_ready_parent(&self, node_idx: usize) -> Option<usize> {
×
NEW
195
        let node = &self.dag[node_idx];
×
196

NEW
197
        node.parents.iter().copied().find(|&parent_idx| {
×
NEW
198
            if self.node_states[parent_idx] != NodeState::NotStarted {
×
NEW
199
                return false;
×
NEW
200
            }
×
201

NEW
202
            let parent = &self.dag[parent_idx];
×
NEW
203
            parent
×
NEW
204
                .children
×
NEW
205
                .iter()
×
NEW
206
                .all(|&child| self.node_states[child] == NodeState::Completed)
×
NEW
207
        })
×
NEW
208
    }
×
209

210
    /// Push ready parents to work stack (no allocation - capacity pre-allocated)
211
    #[inline]
NEW
212
    fn push_ready_parents(&mut self, completed_node: usize) {
×
NEW
213
        let node = &self.dag[completed_node];
×
214

NEW
215
        for &parent_idx in &node.parents {
×
216
            // Skip if already processed
NEW
217
            if self.node_states[parent_idx] != NodeState::NotStarted {
×
NEW
218
                continue;
×
NEW
219
            }
×
220

221
            // Check if all children completed
NEW
222
            let parent = &self.dag[parent_idx];
×
NEW
223
            let all_children_done = parent
×
NEW
224
                .children
×
NEW
225
                .iter()
×
NEW
226
                .all(|&child| self.node_states[child] == NodeState::Completed);
×
227

NEW
228
            if all_children_done {
×
NEW
229
                // Push to work stack - won't allocate due to capacity
×
NEW
230
                self.work_stack.push(parent_idx);
×
NEW
231
            }
×
232
        }
NEW
233
    }
×
234

235
    /// Try to execute a node if ready
236
    #[inline]
NEW
237
    fn try_execute_node(
×
NEW
238
        &mut self,
×
NEW
239
        node_idx: usize,
×
NEW
240
        selected: BitView,
×
NEW
241
        out: &mut ViewMut,
×
NEW
242
    ) -> ExecutionResult {
×
243
        // Check current state
NEW
244
        match self.node_states[node_idx] {
×
NEW
245
            NodeState::Completed => return ExecutionResult::Completed,
×
NEW
246
            NodeState::Executing | NodeState::Pending => {
×
NEW
247
                // Try to continue execution
×
NEW
248
            }
×
249
            NodeState::NotStarted => {
250
                // Check if dependencies are ready
251
                // FIXME(ngates): is this ever not true?
NEW
252
                let node = &self.dag[node_idx];
×
NEW
253
                let ready = node
×
NEW
254
                    .children
×
NEW
255
                    .iter()
×
NEW
256
                    .all(|&child| self.node_states[child] == NodeState::Completed);
×
NEW
257
                if !ready {
×
NEW
258
                    return ExecutionResult::NotReady;
×
NEW
259
                }
×
260

NEW
261
                self.node_states[node_idx] = NodeState::Executing;
×
262
            }
263
        }
264

265
        // Execute the node
NEW
266
        match self.execute_single_node(node_idx, selected, out) {
×
267
            Poll::Ready(Ok(())) => {
NEW
268
                self.node_states[node_idx] = NodeState::Completed;
×
NEW
269
                ExecutionResult::Completed
×
270
            }
271
            Poll::Pending => {
NEW
272
                self.node_states[node_idx] = NodeState::Pending;
×
NEW
273
                ExecutionResult::Pending
×
274
            }
NEW
275
            Poll::Ready(Err(e)) => ExecutionResult::Error(e),
×
276
        }
NEW
277
    }
×
278

279
    /// Execute a single node
280
    #[inline]
NEW
281
    fn execute_single_node(
×
NEW
282
        &mut self,
×
NEW
283
        node_idx: usize,
×
NEW
284
        selected: BitView,
×
NEW
285
        external_out: &mut ViewMut,
×
NEW
286
    ) -> Poll<VortexResult<()>> {
×
NEW
287
        let operator = self.operators[node_idx].as_mut();
×
288

NEW
289
        let ctx = Context {
×
NEW
290
            allocation_plan: &self.allocation_plan,
×
NEW
291
        };
×
292

293
        // FIXME(ngates): should we reset the output vector selection?
294

NEW
295
        match self.allocation_plan.output_targets[node_idx] {
×
NEW
296
            OutputTarget::ExternalOutput => operator.step(&ctx, selected, external_out),
×
NEW
297
            OutputTarget::IntermediateVector(vector_idx) | OutputTarget::InPlace(_, vector_idx) => {
×
NEW
298
                let mut vector_ref = self.allocation_plan.vectors[vector_idx].borrow_mut();
×
NEW
299
                let result = {
×
NEW
300
                    let mut view = vector_ref.as_view_mut();
×
NEW
301
                    operator.step(&ctx, selected, &mut view)
×
302
                };
NEW
303
                vector_ref.deref_mut().set_len(selected.true_count());
×
NEW
304
                result
×
305
            }
306
        }
NEW
307
    }
×
308

309
    /// Reset state for next pipeline step
310
    #[inline]
NEW
311
    fn reset_step(&mut self) {
×
312
        // Reset all node states
NEW
313
        self.node_states.fill(NodeState::NotStarted);
×
314

315
        // Clear work lists (doesn't deallocate)
NEW
316
        self.work_stack.clear();
×
NEW
317
        self.pending_nodes.clear();
×
NEW
318
        self.pending_nodes_next.clear();
×
NEW
319
    }
×
320
}
321

322
/// Execution state for a node
323
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
324
pub(crate) enum NodeState {
325
    /// Node has not been executed yet
326
    NotStarted,
327
    /// Node is currently executing (may return Poll::Pending)
328
    Executing,
329
    /// Node is waiting for external resources (e.g. buffers) to become available
330
    Pending,
331
    /// Node has completed execution
332
    Completed,
333
}
334

335
enum ExecutionResult {
336
    Completed,
337
    Pending,
338
    NotReady,
339
    Error(VortexError),
340
}
341

342
/// FIXME(ngates): this is a hack for testing
343
impl Kernel for Pipeline<'_> {
NEW
344
    fn seek(&mut self, chunk_idx: usize) -> VortexResult<()> {
×
NEW
345
        Pipeline::seek(self, chunk_idx)
×
NEW
346
    }
×
347

NEW
348
    fn step(
×
NEW
349
        &mut self,
×
NEW
350
        ctx: &dyn KernelContext,
×
NEW
351
        selected: BitView,
×
NEW
352
        out: &mut ViewMut,
×
NEW
353
    ) -> Poll<VortexResult<()>> {
×
NEW
354
        Pipeline::step(self, selected, out)
×
NEW
355
    }
×
356
}
357

358
struct Context<'a> {
359
    allocation_plan: &'a VectorAllocationPlan,
360
}
361

362
impl<'a> KernelContext for Context<'a> {
NEW
363
    fn buffer(&self, _buffer_id: BufferId) -> Poll<VortexResult<ByteBuffer>> {
×
NEW
364
        todo!()
×
365
    }
366

NEW
367
    fn vector(&self, vector_id: VectorId) -> VectorRef<'_> {
×
NEW
368
        VectorRef::new(self.allocation_plan.vectors[*vector_id].borrow())
×
NEW
369
    }
×
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc