• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16970635821

14 Aug 2025 04:13PM UTC coverage: 85.882% (-1.8%) from 87.693%
16970635821

Pull #4215

github

web-flow
Merge 5182504a6 into f547cbca5
Pull Request #4215: Ji/vectors

80 of 1729 new or added lines in 38 files covered. (4.63%)

117 existing lines in 25 files now uncovered.

56994 of 66363 relevant lines covered (85.88%)

609331.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-array/src/pipeline/query/buffers.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
//! Vector allocation strategy for pipelines
5

6
use std::cell::RefCell;
7
use std::collections::HashMap;
8

9
use vortex_error::{VortexExpect, VortexResult};
10

11
use crate::pipeline::query::Pipeline;
12
use crate::pipeline::query::dag::DagNode;
13
use crate::pipeline::types::VType;
14
use crate::pipeline::vector::{Vector, VectorId};
15

16
#[derive(Debug)]
17
pub(in crate::pipeline) struct VectorAllocationPlan {
18
    /// Where each node writes its output
19
    pub(in crate::pipeline) output_targets: Vec<OutputTarget>,
20
    /// The actual allocated vectors
21
    pub(in crate::pipeline) vectors: Vec<RefCell<Vector>>,
22
}
23

24
/// Tracks which vector a node outputs to
25
#[derive(Debug, Clone)]
26
pub(in crate::pipeline) enum OutputTarget {
27
    /// Node writes to the top-level provided output
28
    ExternalOutput,
29
    /// Node writes to an allocated intermediate vector
30
    IntermediateVector(usize), // vector idx
31
    /// Node mutates its input in-place (input node index, vector idx)
32
    InPlace(usize, usize),
33
}
34

35
impl OutputTarget {
NEW
36
    pub fn vector_id(&self) -> Option<VectorId> {
×
NEW
37
        match self {
×
NEW
38
            OutputTarget::IntermediateVector(idx) => Some(VectorId(*idx)),
×
NEW
39
            OutputTarget::InPlace(_, idx) => Some(VectorId(*idx)),
×
NEW
40
            OutputTarget::ExternalOutput => None,
×
41
        }
NEW
42
    }
×
43
}
44

45
/// Represents an allocated vector that can be reused
46
#[derive(Debug, Clone)]
47
struct VectorAllocation {
48
    /// Type of elements in this vector
49
    element_type: VType,
50
    /// When this vector becomes available for reuse (execution step)
51
    available_after: Option<usize>,
52
}
53

54
/// Lifetime information for a node's output
55
#[derive(Debug, Clone)]
56
struct NodeLifetime {
57
    /// When this node will be executed (earliest possible)
58
    earliest_execution: usize,
59
    /// When this node's output is last used
60
    last_use: usize,
61
    /// Which nodes consume this node's output
62
    consumers: Vec<usize>,
63
    /// Whether this node can operate in-place on its input
64
    can_operate_in_place: bool,
65
    /// Whether this node's output flows to the final output
66
    flows_to_output: bool,
67
}
68

69
// ============================================================================
70
// Improved Pipeline with vector allocation
71
// ============================================================================
72

73
impl<'a> Pipeline<'a> {
74
    /// Allocate vectors with lifetime analysis and zero-copy optimization
NEW
75
    pub(in crate::pipeline) fn allocate_vectors(
×
NEW
76
        dag_root: usize,
×
NEW
77
        dag: &[DagNode<'a>],
×
NEW
78
        execution_order: &[usize],
×
NEW
79
    ) -> VortexResult<VectorAllocationPlan> {
×
80
        // Step 1: Analyze node lifetimes and data flow
NEW
81
        let lifetimes = Self::analyze_lifetimes(dag, execution_order)?;
×
82

83
        // Step 2: Identify which nodes flow directly to the final output
NEW
84
        let output_flow = Self::trace_output_flow(dag_root, dag)?;
×
85

86
        // Step 3: Determine output targets for each node
NEW
87
        let mut output_targets: Vec<Option<OutputTarget>> = vec![None; dag.len()];
×
NEW
88
        let mut allocations = Vec::new();
×
89

90
        // Process nodes in reverse execution order (top-down for output propagation)
NEW
91
        for &node_idx in execution_order.iter().rev() {
×
NEW
92
            let node = &dag[node_idx];
×
NEW
93
            let plan_node = node.plan_node;
×
NEW
94
            let lifetime = &lifetimes[&node_idx];
×
95

96
            // Determine output target
NEW
97
            let output_target = if node.parents.is_empty() {
×
98
                // Root node - always writes to external output
NEW
99
                OutputTarget::ExternalOutput
×
NEW
100
            } else if output_flow.contains(&node_idx)
×
NEW
101
                && Self::can_pass_through_output(dag, node_idx, &output_targets)
×
102
            {
103
                // This node's output flows to the final output and all parents can pass it through
NEW
104
                OutputTarget::ExternalOutput
×
NEW
105
            } else if lifetime.can_operate_in_place {
×
106
                // Check if we can operate in-place on one of our inputs
NEW
107
                if let Some((input_idx, input_alloc)) =
×
NEW
108
                    Self::find_in_place_candidate(node, &output_targets, &lifetimes)
×
109
                {
NEW
110
                    OutputTarget::InPlace(input_idx, input_alloc)
×
111
                } else {
112
                    // Need new allocation
NEW
113
                    let alloc_id = allocations.len();
×
NEW
114
                    allocations.push(VectorAllocation {
×
NEW
115
                        element_type: plan_node.vtype(),
×
NEW
116
                        available_after: Some(lifetime.last_use),
×
NEW
117
                    });
×
NEW
118
                    OutputTarget::IntermediateVector(alloc_id)
×
119
                }
120
            } else {
121
                // Need new allocation
NEW
122
                let alloc_id = allocations.len();
×
NEW
123
                allocations.push(VectorAllocation {
×
NEW
124
                    element_type: plan_node.vtype(),
×
NEW
125
                    available_after: Some(lifetime.last_use),
×
NEW
126
                });
×
NEW
127
                OutputTarget::IntermediateVector(alloc_id)
×
128
            };
129

NEW
130
            output_targets[node_idx] = Some(output_target);
×
131
        }
132

133
        // Step 4: Optimize allocations with graph coloring
134
        // let optimized_allocations = Self::optimize_allocations(allocations, &lifetimes)?;
135

136
        Ok(VectorAllocationPlan {
NEW
137
            output_targets: output_targets
×
NEW
138
                .into_iter()
×
NEW
139
                .map(|target| target.vortex_expect("missing target"))
×
NEW
140
                .collect(),
×
NEW
141
            vectors: allocations
×
NEW
142
                .into_iter()
×
NEW
143
                .map(|alloc| RefCell::new(Vector::new_with_vtype(alloc.element_type)))
×
NEW
144
                .collect(),
×
145
        })
NEW
146
    }
×
147

148
    /// Analyze the lifetimes of node outputs
NEW
149
    fn analyze_lifetimes(
×
NEW
150
        dag: &[DagNode],
×
NEW
151
        execution_order: &[usize],
×
NEW
152
    ) -> VortexResult<HashMap<usize, NodeLifetime>> {
×
NEW
153
        let mut lifetimes = HashMap::new();
×
154

155
        // Build execution position map
NEW
156
        let exec_pos: HashMap<usize, usize> = execution_order
×
NEW
157
            .iter()
×
NEW
158
            .enumerate()
×
NEW
159
            .map(|(pos, &idx)| (idx, pos))
×
NEW
160
            .collect();
×
161

NEW
162
        for (node_idx, node) in dag.iter().enumerate() {
×
NEW
163
            let earliest_execution = exec_pos[&node_idx];
×
164

165
            // Find when output is last used
NEW
166
            let last_use = if node.parents.is_empty() {
×
167
                // Root node - used at the very end
NEW
168
                execution_order.len()
×
169
            } else {
170
                // Last parent to execute
NEW
171
                node.parents
×
NEW
172
                    .iter()
×
NEW
173
                    .map(|&parent| exec_pos[&parent])
×
NEW
174
                    .max()
×
NEW
175
                    .unwrap_or(earliest_execution)
×
176
            };
177

178
            // Check if node can operate in-place
179
            // This would need to come from the plan node metadata
NEW
180
            let can_operate_in_place = false; // TODO: get from plan node
×
181

182
            // Check if flows to output
NEW
183
            let flows_to_output = node.parents.is_empty()
×
NEW
184
                || node.parents.iter().any(|&p| {
×
185
                    // Recursive check would go here
NEW
186
                    dag[p].parents.is_empty()
×
NEW
187
                });
×
188

NEW
189
            lifetimes.insert(
×
NEW
190
                node_idx,
×
NEW
191
                NodeLifetime {
×
NEW
192
                    earliest_execution,
×
NEW
193
                    last_use,
×
NEW
194
                    consumers: node.parents.clone(),
×
NEW
195
                    can_operate_in_place,
×
NEW
196
                    flows_to_output,
×
NEW
197
                },
×
198
            );
199
        }
200

NEW
201
        Ok(lifetimes)
×
NEW
202
    }
×
203

204
    /// Trace which nodes' outputs flow to the final output
205
    ///
206
    /// NOTE: we don't check for cycles here, assuming the DAG is acyclic.
NEW
207
    fn trace_output_flow(dag_root: usize, dag: &[DagNode]) -> VortexResult<Vec<usize>> {
×
NEW
208
        let mut flows_to_output = Vec::new();
×
NEW
209
        let mut current = dag_root;
×
210

211
        loop {
NEW
212
            let node = &dag[current];
×
213

214
            // Check if first child has matching type
NEW
215
            if let Some(&first_child_idx) = node.children.first() {
×
NEW
216
                let first_child = &dag[first_child_idx];
×
217

NEW
218
                if node.plan_node.vtype() == first_child.plan_node.vtype() {
×
NEW
219
                    // This node can pass through the output buffer
×
NEW
220
                    flows_to_output.push(current);
×
NEW
221
                    // Continue down the first child
×
NEW
222
                    current = first_child_idx;
×
NEW
223
                } else {
×
224
                    // Types don't match, can't pass through
NEW
225
                    break;
×
226
                }
227
            } else {
228
                // No children, we're done
NEW
229
                break;
×
230
            }
231
        }
232

NEW
233
        Ok(flows_to_output)
×
NEW
234
    }
×
235

236
    /// Check if we can pass the external output through this node
NEW
237
    fn can_pass_through_output(
×
NEW
238
        dag: &[DagNode],
×
NEW
239
        node_idx: usize,
×
NEW
240
        output_targets: &[Option<OutputTarget>],
×
NEW
241
    ) -> bool {
×
NEW
242
        let node = &dag[node_idx];
×
243

244
        // There must not be multiple parents, and it must:
245
        // 1. Already use external output, OR
246
        // 2. Be able to pass through its input
247
        // AND
248
        // 1. The input type must match the output type
NEW
249
        if node.parents.len() > 1 {
×
NEW
250
            return false; // Cannot pass through if multiple parents
×
NEW
251
        }
×
NEW
252
        node.parents.iter().all(|&parent| {
×
NEW
253
            if node.plan_node.vtype() != dag[parent].plan_node.vtype() {
×
NEW
254
                return false; // Type mismatch
×
NEW
255
            }
×
NEW
256
            match output_targets[parent] {
×
NEW
257
                Some(OutputTarget::ExternalOutput) => true,
×
NEW
258
                Some(OutputTarget::InPlace(..)) => true, // Can pass through
×
NEW
259
                _ => false,
×
260
            }
NEW
261
        })
×
NEW
262
    }
×
263

264
    /// Find a suitable input for in-place operation
NEW
265
    fn find_in_place_candidate(
×
NEW
266
        node: &DagNode,
×
NEW
267
        output_targets: &[Option<OutputTarget>],
×
NEW
268
        lifetimes: &HashMap<usize, NodeLifetime>,
×
NEW
269
    ) -> Option<(usize, usize)> {
×
270
        // Check each child
NEW
271
        for (input_idx, &child_node_idx) in node.children.iter().enumerate() {
×
NEW
272
            if let Some(target) = &output_targets[child_node_idx]
×
NEW
273
                && let OutputTarget::IntermediateVector(alloc_id) = target
×
274
            {
275
                // Check if this child's output is only used by us
NEW
276
                let child_lifetime = &lifetimes[&child_node_idx];
×
NEW
277
                if child_lifetime.consumers.len() == 1 && child_lifetime.consumers[0] == node.index
×
278
                {
279
                    // We're the only consumer - can reuse in-place
NEW
280
                    return Some((input_idx, *alloc_id));
×
NEW
281
                }
×
NEW
282
            }
×
283
        }
NEW
284
        None
×
NEW
285
    }
×
286

287
    // Optimize allocations using graph coloring
288
    // fn optimize_allocations(
289
    //     allocations: Vec<VectorAllocation>,
290
    //     lifetimes: &HashMap<usize, NodeLifetime>,
291
    // ) -> VortexResult<Vec<Vector>> {
292
    //     // Group allocations by type and size
293
    //     let mut allocation_groups: HashMap<(VType, usize), Vec<VectorAllocation>> = HashMap::new();
294
    //
295
    //     for alloc in allocations {
296
    //         let key = (alloc.element_type, alloc.size_bytes);
297
    //         allocation_groups.entry(key).or_default().push(alloc);
298
    //     }
299
    //
300
    //     // For each group, find minimum number of actual vectors needed
301
    //     let mut vectors = Vec::new();
302
    //
303
    //     for ((vtype, size), allocs) in allocation_groups {
304
    //         // Sort by availability time
305
    //         let mut sorted_allocs = allocs;
306
    //         sorted_allocs.sort_by_key(|a| a.available_after);
307
    //
308
    //         // Use interval scheduling to find minimum vectors
309
    //         let mut reuse_map = HashMap::new();
310
    //         let mut available_vectors: Vec<(usize, usize)> = Vec::new(); // (vector_id, available_after)
311
    //
312
    //         for alloc in sorted_allocs {
313
    //             // Find a vector that's available
314
    //             let vector_id = if let Some(pos) = available_vectors
315
    //                 .iter()
316
    //                 .position(|(_, avail)| *avail <= alloc.id)
317
    //             {
318
    //                 let (vid, _) = available_vectors.remove(pos);
319
    //                 vid
320
    //             } else {
321
    //                 // Need new vector
322
    //                 let vid = vectors.len();
323
    //                 vectors.push(Vector::new(vtype, 1024)?);
324
    //                 vid
325
    //             };
326
    //
327
    //             reuse_map.insert(alloc.id, vector_id);
328
    //
329
    //             if let Some(available_after) = alloc.available_after {
330
    //                 available_vectors.push((vector_id, available_after));
331
    //             }
332
    //         }
333
    //     }
334
    //
335
    //     Ok(vectors)
336
    // }
337
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc