• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.54
/dozer-core/src/executor/execution_dag.rs
1
use std::{
2
    borrow::BorrowMut,
3
    cell::RefCell,
4
    collections::{hash_map::Entry, HashMap},
5
    fmt::Debug,
6
    rc::Rc,
7
    sync::Arc,
8
};
9

10
use crate::{
11
    builder_dag::{BuilderDag, NodeKind, NodeType},
12
    epoch::EpochManager,
13
    errors::ExecutionError,
14
    hash_map_to_vec::insert_vec_element,
15
    node::PortHandle,
16
    record_store::{create_record_store, RecordWriter},
17
};
18
use crossbeam::channel::{bounded, Receiver, Sender};
19
use daggy::petgraph::{
20
    visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeIdentifiers},
21
    Direction,
22
};
23

24
use super::ExecutorOperation;
25

26
pub type SharedRecordWriter = Rc<RefCell<Option<Box<dyn RecordWriter>>>>;
27

28
#[derive(Debug, Clone)]
×
29
pub struct EdgeType {
30
    /// Output port handle.
31
    pub output_port: PortHandle,
32
    /// The sender for data flowing downstream.
33
    pub sender: Sender<ExecutorOperation>,
34
    /// The record writer for persisting data for downstream queries, if persistency is needed. Different edges with the same output port share the same record writer.
35
    pub record_writer: SharedRecordWriter,
36
    /// Input port handle.
37
    pub input_port: PortHandle,
38
    /// The receiver from receiving data from upstream.
39
    pub receiver: Receiver<ExecutorOperation>,
40
}
41

42
#[derive(Debug)]
×
43
pub struct ExecutionDag {
44
    /// Nodes will be moved into execution threads.
45
    graph: daggy::Dag<Option<NodeType>, EdgeType>,
46
    epoch_manager: Arc<EpochManager>,
47
}
48

49
impl ExecutionDag {
50
    pub fn new(builder_dag: BuilderDag, channel_buffer_sz: usize) -> Result<Self, ExecutionError> {
334✔
51
        // Count number of sources.
334✔
52
        let num_sources = builder_dag
334✔
53
            .graph()
334✔
54
            .node_identifiers()
334✔
55
            .filter(|node_index| {
1,706✔
56
                matches!(
1,362✔
57
                    builder_dag.graph()[*node_index].kind,
1,706✔
58
                    NodeKind::Source(_, _)
59
                )
60
            })
1,706✔
61
            .count();
334✔
62

334✔
63
        // We only create record stored once for every output port. Every `HashMap` in this `Vec` tracks if a node's output ports already have the record store created.
334✔
64
        let mut all_record_stores = vec![
334✔
65
            HashMap::<PortHandle, SharedRecordWriter>::new();
334✔
66
            builder_dag.graph().node_count()
334✔
67
        ];
334✔
68

334✔
69
        // Create new edges.
334✔
70
        let mut edges = vec![];
334✔
71
        for builder_dag_edge in builder_dag.graph().raw_edges().iter() {
1,494✔
72
            let source_node_index = builder_dag_edge.source();
1,494✔
73
            let edge = &builder_dag_edge.weight;
1,494✔
74
            let output_port = builder_dag_edge.weight.output_port;
1,494✔
75

76
            // Create or get record store.
77
            let record_writer =
1,494✔
78
                match all_record_stores[source_node_index.index()].entry(output_port) {
1,494✔
79
                    Entry::Vacant(entry) => {
1,484✔
80
                        let record_store = create_record_store(
1,484✔
81
                            output_port,
1,484✔
82
                            edge.output_port_type,
1,484✔
83
                            edge.schema.clone(),
1,484✔
84
                        )?;
1,484✔
85
                        let record_writer = if let Some(record_writer) = record_store {
1,484✔
86
                            Rc::new(RefCell::new(Some(record_writer)))
424✔
87
                        } else {
88
                            Rc::new(RefCell::new(None))
1,060✔
89
                        };
90
                        entry.insert(record_writer).clone()
1,484✔
91
                    }
92
                    Entry::Occupied(entry) => entry.get().clone(),
10✔
93
                };
94

95
            // Create channel.
96
            let (sender, receiver) = bounded(channel_buffer_sz);
1,494✔
97

1,494✔
98
            // Create edge.
1,494✔
99
            let edge = EdgeType {
1,494✔
100
                output_port,
1,494✔
101
                sender,
1,494✔
102
                record_writer,
1,494✔
103
                input_port: edge.input_port,
1,494✔
104
                receiver,
1,494✔
105
            };
1,494✔
106
            edges.push(Some(edge));
1,494✔
107
        }
108

109
        // Create new graph.
110
        let graph = builder_dag.into_graph().map_owned(
334✔
111
            |_, node| Some(node),
1,706✔
112
            |edge_index, _| {
1,494✔
113
                edges[edge_index.index()]
1,494✔
114
                    .take()
1,494✔
115
                    .expect("We created all edges")
1,494✔
116
            },
1,494✔
117
        );
334✔
118
        Ok(ExecutionDag {
334✔
119
            graph,
334✔
120
            epoch_manager: Arc::new(EpochManager::new(num_sources)),
334✔
121
        })
334✔
122
    }
334✔
123

124
    pub fn graph(&self) -> &daggy::Dag<Option<NodeType>, EdgeType> {
2,040✔
125
        &self.graph
2,040✔
126
    }
2,040✔
127

128
    pub fn node_weight_mut(&mut self, node_index: daggy::NodeIndex) -> &mut Option<NodeType> {
1,706✔
129
        &mut self.graph[node_index]
1,706✔
130
    }
1,706✔
131

132
    pub fn epoch_manager(&self) -> &Arc<EpochManager> {
344✔
133
        &self.epoch_manager
344✔
134
    }
344✔
135

136
    #[allow(clippy::type_complexity)]
137
    pub fn collect_senders_and_record_writers(
1,368✔
138
        &mut self,
1,368✔
139
        node_index: daggy::NodeIndex,
1,368✔
140
    ) -> (
1,368✔
141
        HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
1,368✔
142
        HashMap<PortHandle, Box<dyn RecordWriter>>,
1,368✔
143
    ) {
1,368✔
144
        let edge_indexes = self
1,368✔
145
            .graph
1,368✔
146
            .edges(node_index)
1,368✔
147
            .map(|edge| edge.id())
1,494✔
148
            .collect::<Vec<_>>();
1,368✔
149

1,368✔
150
        let mut senders = HashMap::new();
1,368✔
151
        let mut record_writers = HashMap::new();
1,368✔
152
        for edge_index in edge_indexes {
2,862✔
153
            let edge = self
1,494✔
154
                .graph
1,494✔
155
                .edge_weight_mut(edge_index)
1,494✔
156
                .expect("We don't modify graph structure, only modify the edge weight");
1,494✔
157
            insert_vec_element(&mut senders, edge.output_port, edge.sender.clone());
1,494✔
158
            if let Entry::Vacant(entry) = record_writers.entry(edge.output_port) {
1,494✔
159
                // This interior mutability is to word around `Rc`. Other parts of this function is correctly marked `mut`.
160
                if let Some(record_writer) = edge.record_writer.borrow_mut().take() {
1,490✔
161
                    entry.insert(record_writer);
424✔
162
                }
1,066✔
163
            }
4✔
164
        }
165

166
        (senders, record_writers)
1,368✔
167
    }
1,368✔
168

169
    pub fn collect_receivers(
1,362✔
170
        &mut self,
1,362✔
171
        node_index: daggy::NodeIndex,
1,362✔
172
    ) -> (Vec<PortHandle>, Vec<Receiver<ExecutorOperation>>) {
1,362✔
173
        let edge_indexes = self
1,362✔
174
            .graph
1,362✔
175
            .edges_directed(node_index, Direction::Incoming)
1,362✔
176
            .map(|edge| edge.id())
1,494✔
177
            .collect::<Vec<_>>();
1,362✔
178

1,362✔
179
        let mut input_ports = Vec::new();
1,362✔
180
        let mut receivers = Vec::new();
1,362✔
181
        for edge_index in edge_indexes {
2,856✔
182
            let edge = self
1,494✔
183
                .graph
1,494✔
184
                .edge_weight_mut(edge_index)
1,494✔
185
                .expect("We don't modify graph structure, only modify the edge weight");
1,494✔
186
            input_ports.push(edge.input_port);
1,494✔
187
            receivers.push(edge.receiver.clone());
1,494✔
188
        }
1,494✔
189
        (input_ports, receivers)
1,362✔
190
    }
1,362✔
191
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc