• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16970635821

14 Aug 2025 04:13PM UTC coverage: 85.882% (-1.8%) from 87.693%
16970635821

Pull #4215

github

web-flow
Merge 5182504a6 into f547cbca5
Pull Request #4215: Ji/vectors

80 of 1729 new or added lines in 38 files covered. (4.63%)

117 existing lines in 25 files now uncovered.

56994 of 66363 relevant lines covered (85.88%)

609331.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-array/src/pipeline/query/dag.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::hash::BuildHasher;
5

6
use vortex_error::VortexResult;
7
use vortex_utils::aliases::hash_map::{HashMap, RandomState};
8

9
use crate::pipeline::operators::Operator;
10
use crate::pipeline::query::Pipeline;
11

12
/// A node in our execution DAG
13
#[derive(Clone, Debug)]
14
pub(in crate::pipeline) struct DagNode<'a> {
15
    /// Index of this node in the DAG
16
    pub(in crate::pipeline) index: usize,
17
    /// The original plan node
18
    pub(in crate::pipeline) plan_node: &'a dyn Operator,
19
    /// Indices of children in the DAG
20
    pub(in crate::pipeline) children: Vec<usize>,
21
    /// Indices of parents in the DAG (for dependency tracking)
22
    pub(in crate::pipeline) parents: Vec<usize>,
23
    /// Hash of this subtree (for deduplication)
24
    pub(in crate::pipeline) subtree_hash: u64,
25
    /// Output buffer assignment (if not writing to final output)
26
    pub(in crate::pipeline) output_buffer: Option<BufferSlot>,
27
}
28

29
/// A reusable buffer slot
30
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31
pub(in crate::pipeline) struct BufferSlot {
32
    index: usize,
33
    size_bytes: usize,
34
}
35

36
impl<'a> Pipeline<'a> {
37
    /// Build DAG from a tree, eliminating common sub-expressions
NEW
38
    pub(in crate::pipeline) fn build_dag(
×
NEW
39
        root: &'a dyn Operator,
×
NEW
40
    ) -> VortexResult<(usize, Vec<DagNode<'a>>)> {
×
NEW
41
        let mut dag = Vec::new();
×
NEW
42
        let mut hash_to_index = HashMap::new();
×
43

44
        // Recursive function to build DAG
NEW
45
        fn visit_node<'b>(
×
NEW
46
            node: &'b dyn Operator,
×
NEW
47
            dag: &mut Vec<DagNode<'b>>,
×
NEW
48
            hash_to_index: &mut HashMap<u64, usize>,
×
NEW
49
            random_state: &RandomState,
×
NEW
50
        ) -> usize {
×
51
            // Compute hash for this subtree
NEW
52
            let subtree_hash = random_state.hash_one(node);
×
53

54
            // Check if we've seen this subtree before (sub-expression elimination)
NEW
55
            if let Some(&existing_index) = hash_to_index.get(&subtree_hash) {
×
56
                // Reuse existing node
NEW
57
                return existing_index;
×
NEW
58
            }
×
59

60
            // Process children first (post-order traversal)
NEW
61
            let child_indices: Vec<usize> = node
×
NEW
62
                .children()
×
NEW
63
                .iter()
×
NEW
64
                .map(|child| visit_node(child.as_ref(), dag, hash_to_index, random_state))
×
NEW
65
                .collect();
×
66

67
            // Create new DAG node
NEW
68
            let index = dag.len();
×
NEW
69
            let dag_node = DagNode {
×
NEW
70
                index,
×
NEW
71
                plan_node: node,
×
NEW
72
                children: child_indices,
×
NEW
73
                parents: Vec::new(), // Will be filled in later
×
NEW
74
                subtree_hash,
×
NEW
75
                output_buffer: None, // Will be assigned later
×
NEW
76
            };
×
77

NEW
78
            dag.push(dag_node);
×
NEW
79
            hash_to_index.insert(subtree_hash, index);
×
80

81
            // Store the plan node (we need to clone or move it somehow)
82
            // This is tricky with the current design - we might need Arc
83
            // For now, assume we can store a reference or recreate it
84

NEW
85
            index
×
NEW
86
        }
×
87

88
        // Build the DAG
NEW
89
        let random_state = RandomState::default();
×
NEW
90
        let root_index = visit_node(root, &mut dag, &mut hash_to_index, &random_state);
×
91

92
        // Fill in parent relationships
NEW
93
        for i in 0..dag.len() {
×
NEW
94
            let children = dag[i].children.clone();
×
NEW
95
            for &child_idx in &children {
×
NEW
96
                dag[child_idx].parents.push(i);
×
NEW
97
            }
×
98
        }
99

NEW
100
        Ok((root_index, dag))
×
NEW
101
    }
×
102
}
103

104
#[cfg(test)]
105
mod tests {
106
    #[test]
107
    fn test_dag_construction() {
108
        // Create a tree with common sub-expressions
109
        // Example:
110
        //       root
111
        //      /    \
112
        //     A      B
113
        //    / \    / \
114
        //   C   D  C   E
115
        //
116
        // Should become DAG:
117
        //       root
118
        //      /    \
119
        //     A      B
120
        //    / \    / \
121
        //   C   D  /   E
122
        //    \    /
123
        //     \  /
124
        //      \/
125
        //      (C is shared)
126

127
        // let root = create_test_tree();
128
        // let pipeline = Pipeline::new(root).unwrap();
129

130
        // Verify DAG has fewer nodes than tree
131
        // assert!(pipeline.dag.len() < count_tree_nodes(root));
132
    }
133
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc