• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4280003486

pending completion
4280003486

push

github

GitHub
fix: Record readers not inserted in release builds (#1068)

2 of 2 new or added lines in 1 file covered. (100.0%)

27210 of 37316 relevant lines covered (72.92%)

37049.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.08
/dozer-core/src/executor/execution_dag.rs
1
use std::{
2
    borrow::BorrowMut,
3
    cell::RefCell,
4
    collections::{hash_map::Entry, HashMap},
5
    fmt::Debug,
6
    rc::Rc,
7
    sync::Arc,
8
};
9

10
use crate::{
11
    builder_dag::{BuilderDag, NodeKind, NodeType},
12
    epoch::EpochManager,
13
    errors::ExecutionError,
14
    hash_map_to_vec::insert_vec_element,
15
    node::PortHandle,
16
    record_store::{create_record_store, RecordReader, RecordWriter},
17
};
18
use crossbeam::channel::{bounded, Receiver, Sender};
19
use daggy::petgraph::{
20
    visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeIdentifiers},
21
    Direction,
22
};
23

24
use super::ExecutorOperation;
25

26
pub type SharedRecordWriter = Rc<RefCell<Option<Box<dyn RecordWriter>>>>;
27

28
#[derive(Debug, Clone)]
×
29
pub struct EdgeType {
30
    /// Output port handle.
31
    pub output_port: PortHandle,
32
    /// The sender for data flowing downstream.
33
    pub sender: Sender<ExecutorOperation>,
34
    /// The record writer for persisting data for downstream queries, if persistency is needed. Different edges with the same output port share the same record writer.
35
    pub record_writer: SharedRecordWriter,
36
    /// Input port handle.
37
    pub input_port: PortHandle,
38
    /// The receiver from receiving data from upstream.
39
    pub receiver: Receiver<ExecutorOperation>,
40
    /// The record reader for reading persisted data of corresponding output port, if there's some.
41
    pub record_reader: Option<Box<dyn RecordReader>>,
42
}
43

44
#[derive(Debug)]
×
45
pub struct ExecutionDag {
46
    graph: daggy::Dag<NodeType, EdgeType>,
47
    epoch_manager: Arc<EpochManager>,
48
}
49

50
impl ExecutionDag {
51
    pub fn new(
332✔
52
        mut builder_dag: BuilderDag,
332✔
53
        channel_buffer_sz: usize,
332✔
54
    ) -> Result<Self, ExecutionError> {
332✔
55
        // Create new nodes.
332✔
56
        let mut nodes = vec![];
332✔
57
        let mut num_sources = 0;
332✔
58
        let node_indexes = builder_dag.graph().node_identifiers().collect::<Vec<_>>();
332✔
59
        for node_index in node_indexes {
2,027✔
60
            let node = builder_dag
1,695✔
61
                .node_weight_mut(node_index)
1,695✔
62
                .take()
1,695✔
63
                .expect("Builder dag should have created all nodes");
1,695✔
64
            if matches!(
1,353✔
65
                node.kind
1,695✔
66
                    .as_ref()
1,695✔
67
                    .expect("Builder dag should have created all nodes"),
1,695✔
68
                NodeKind::Source(_, _)
69
            ) {
342✔
70
                num_sources += 1;
342✔
71
            }
1,353✔
72

73
            nodes.push(Some(node));
1,695✔
74
        }
75

76
        // We only create record stored once for every output port. Every `HashMap` in this `Vec` tracks if a node's output ports already have the record store created.
77
        let dag = builder_dag.graph();
332✔
78
        let mut all_record_stores =
332✔
79
            vec![
332✔
80
                HashMap::<PortHandle, (SharedRecordWriter, Option<Box<dyn RecordReader>>)>::new();
332✔
81
                dag.node_count()
332✔
82
            ];
332✔
83

332✔
84
        // Create new edges.
332✔
85
        let mut edges = vec![];
332✔
86
        for builder_dag_edge in dag.raw_edges().iter() {
1,473✔
87
            let source_node_index = builder_dag_edge.source().index();
1,473✔
88
            let edge = &builder_dag_edge.weight;
1,473✔
89
            let output_port = builder_dag_edge.weight.output_port;
1,473✔
90

91
            // Create or get record store.
92
            let (record_writer, record_reader) =
1,473✔
93
                match all_record_stores[source_node_index].entry(output_port) {
1,473✔
94
                    Entry::Vacant(entry) => {
1,469✔
95
                        let record_store = create_record_store(
1,469✔
96
                            &nodes[source_node_index]
1,469✔
97
                                .as_ref()
1,469✔
98
                                .expect("We created all nodes")
1,469✔
99
                                .storage
1,469✔
100
                                .master_txn,
1,469✔
101
                            output_port,
1,469✔
102
                            edge.output_port_type,
1,469✔
103
                            edge.schema.clone(),
1,469✔
104
                            channel_buffer_sz + 1,
1,469✔
105
                        )?;
1,469✔
106
                        let (record_writer, record_reader) =
1,469✔
107
                            if let Some((record_writer, record_reader)) = record_store {
1,469✔
108
                                (
434✔
109
                                    Rc::new(RefCell::new(Some(record_writer))),
434✔
110
                                    Some(record_reader),
434✔
111
                                )
434✔
112
                            } else {
113
                                (Rc::new(RefCell::new(None)), None)
1,035✔
114
                            };
115
                        entry.insert((record_writer, record_reader)).clone()
1,469✔
116
                    }
117
                    Entry::Occupied(entry) => entry.get().clone(),
4✔
118
                };
119

120
            // Create channel.
121
            let (sender, receiver) = bounded(channel_buffer_sz);
1,473✔
122

1,473✔
123
            // Create edge.
1,473✔
124
            let edge = EdgeType {
1,473✔
125
                output_port,
1,473✔
126
                sender,
1,473✔
127
                record_writer,
1,473✔
128
                input_port: edge.input_port,
1,473✔
129
                receiver,
1,473✔
130
                record_reader,
1,473✔
131
            };
1,473✔
132
            edges.push(Some(edge));
1,473✔
133
        }
134

135
        // Create new graph.
136
        let graph = dag.map(
332✔
137
            |node_index, _| {
1,695✔
138
                nodes[node_index.index()]
1,695✔
139
                    .take()
1,695✔
140
                    .expect("Builder dag should have all nodes")
1,695✔
141
            },
1,695✔
142
            |edge_index, _| {
1,473✔
143
                edges[edge_index.index()]
1,473✔
144
                    .take()
1,473✔
145
                    .expect("We created all edges")
1,473✔
146
            },
1,473✔
147
        );
332✔
148
        Ok(ExecutionDag {
332✔
149
            graph,
332✔
150
            epoch_manager: Arc::new(EpochManager::new(num_sources)),
332✔
151
        })
332✔
152
    }
332✔
153

154
    pub fn graph(&self) -> &daggy::Dag<NodeType, EdgeType> {
2,027✔
155
        &self.graph
2,027✔
156
    }
2,027✔
157

158
    pub fn node_weight_mut(&mut self, node_index: daggy::NodeIndex) -> &mut NodeType {
1,695✔
159
        &mut self.graph[node_index]
1,695✔
160
    }
1,695✔
161

162
    pub fn epoch_manager(&self) -> &Arc<EpochManager> {
342✔
163
        &self.epoch_manager
342✔
164
    }
342✔
165

166
    #[allow(clippy::type_complexity)]
167
    pub fn collect_senders_and_record_writers(
1,359✔
168
        &mut self,
1,359✔
169
        node_index: daggy::NodeIndex,
1,359✔
170
    ) -> (
1,359✔
171
        HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
1,359✔
172
        HashMap<PortHandle, Box<dyn RecordWriter>>,
1,359✔
173
    ) {
1,359✔
174
        let edge_indexes = self
1,359✔
175
            .graph
1,359✔
176
            .edges(node_index)
1,359✔
177
            .map(|edge| edge.id())
1,473✔
178
            .collect::<Vec<_>>();
1,359✔
179

1,359✔
180
        let mut senders = HashMap::new();
1,359✔
181
        let mut record_writers = HashMap::new();
1,359✔
182
        for edge_index in edge_indexes {
2,832✔
183
            let edge = self
1,473✔
184
                .graph
1,473✔
185
                .edge_weight_mut(edge_index)
1,473✔
186
                .expect("We don't modify graph structure, only modify the edge weight");
1,473✔
187
            insert_vec_element(&mut senders, edge.output_port, edge.sender.clone());
1,473✔
188
            if let Entry::Vacant(entry) = record_writers.entry(edge.output_port) {
1,473✔
189
                // This interior mutability is to word around `Rc`. Other parts of this function is correctly marked `mut`.
190
                if let Some(record_writer) = edge.record_writer.borrow_mut().take() {
1,469✔
191
                    entry.insert(record_writer);
434✔
192
                }
1,035✔
193
            }
4✔
194
        }
195

196
        (senders, record_writers)
1,359✔
197
    }
1,359✔
198

199
    #[allow(clippy::type_complexity)]
200
    pub fn collect_receivers_and_record_readers(
1,353✔
201
        &mut self,
1,353✔
202
        node_index: daggy::NodeIndex,
1,353✔
203
    ) -> (
1,353✔
204
        Vec<PortHandle>,
1,353✔
205
        Vec<Receiver<ExecutorOperation>>,
1,353✔
206
        HashMap<PortHandle, Box<dyn RecordReader>>,
1,353✔
207
    ) {
1,353✔
208
        let edge_indexes = self
1,353✔
209
            .graph
1,353✔
210
            .edges_directed(node_index, Direction::Incoming)
1,353✔
211
            .map(|edge| edge.id())
1,473✔
212
            .collect::<Vec<_>>();
1,353✔
213

1,353✔
214
        let mut input_ports = Vec::new();
1,353✔
215
        let mut receivers = Vec::new();
1,353✔
216
        let mut record_readers = HashMap::new();
1,353✔
217
        for edge_index in edge_indexes {
2,826✔
218
            let edge = self
1,473✔
219
                .graph
1,473✔
220
                .edge_weight_mut(edge_index)
1,473✔
221
                .expect("We don't modify graph structure, only modify the edge weight");
1,473✔
222
            input_ports.push(edge.input_port);
1,473✔
223
            receivers.push(edge.receiver.clone());
1,473✔
224
            if let Some(record_reader) = edge.record_reader.take() {
1,473✔
225
                let insert_result = record_readers.insert(edge.input_port, record_reader);
438✔
226
                debug_assert!(
227
                    insert_result.is_none(),
438✔
228
                    "More than one output connect to a input port"
×
229
                );
230
            }
1,035✔
231
        }
232
        (input_ports, receivers, record_readers)
1,353✔
233
    }
1,353✔
234
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc