• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4315007357

pending completion
4315007357

push

github

GitHub
fix: Sink should only be built after all source checkpoints are checked (#1112)

280 of 280 new or added lines in 24 files covered. (100.0%)

28292 of 38914 relevant lines covered (72.7%)

64132.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.42
/dozer-core/src/executor/execution_dag.rs
1
use std::{
2
    borrow::BorrowMut,
3
    cell::RefCell,
4
    collections::{hash_map::Entry, HashMap},
5
    fmt::Debug,
6
    rc::Rc,
7
    sync::Arc,
8
};
9

10
use crate::{
11
    builder_dag::{BuilderDag, NodeKind, NodeType},
12
    epoch::EpochManager,
13
    errors::ExecutionError,
14
    hash_map_to_vec::insert_vec_element,
15
    node::PortHandle,
16
    record_store::{create_record_store, RecordReader, RecordWriter},
17
};
18
use crossbeam::channel::{bounded, Receiver, Sender};
19
use daggy::petgraph::{
20
    visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeIdentifiers},
21
    Direction,
22
};
23

24
use super::ExecutorOperation;
25

26
pub type SharedRecordWriter = Rc<RefCell<Option<Box<dyn RecordWriter>>>>;
27

28
#[derive(Debug, Clone)]
×
29
pub struct EdgeType {
30
    /// Output port handle.
31
    pub output_port: PortHandle,
32
    /// The sender for data flowing downstream.
33
    pub sender: Sender<ExecutorOperation>,
34
    /// The record writer for persisting data for downstream queries, if persistency is needed. Different edges with the same output port share the same record writer.
35
    pub record_writer: SharedRecordWriter,
36
    /// Input port handle.
37
    pub input_port: PortHandle,
38
    /// The receiver from receiving data from upstream.
39
    pub receiver: Receiver<ExecutorOperation>,
40
    /// The record reader for reading persisted data of corresponding output port, if there's some.
41
    pub record_reader: Option<Box<dyn RecordReader>>,
42
}
43

44
#[derive(Debug)]
×
45
pub struct ExecutionDag {
46
    /// Nodes will be moved into execution threads.
47
    graph: daggy::Dag<Option<NodeType>, EdgeType>,
48
    epoch_manager: Arc<EpochManager>,
49
}
50

51
impl ExecutionDag {
×
52
    pub fn new(builder_dag: BuilderDag, channel_buffer_sz: usize) -> Result<Self, ExecutionError> {
332✔
53
        // Count number of sources.
332✔
54
        let num_sources = builder_dag
332✔
55
            .graph()
332✔
56
            .node_identifiers()
332✔
57
            .filter(|node_index| {
1,695✔
58
                matches!(
1,353✔
59
                    builder_dag.graph()[*node_index].kind,
1,695✔
60
                    NodeKind::Source(_, _)
×
61
                )
×
62
            })
1,695✔
63
            .count();
332✔
64

332✔
65
        // We only create record stored once for every output port. Every `HashMap` in this `Vec` tracks if a node's output ports already have the record store created.
332✔
66
        let mut all_record_stores =
332✔
67
            vec![
332✔
68
                HashMap::<PortHandle, (SharedRecordWriter, Option<Box<dyn RecordReader>>)>::new();
332✔
69
                builder_dag.graph().node_count()
332✔
70
            ];
332✔
71

332✔
72
        // Create new edges.
332✔
73
        let mut edges = vec![];
332✔
74
        for builder_dag_edge in builder_dag.graph().raw_edges().iter() {
1,473✔
75
            let source_node_index = builder_dag_edge.source();
1,473✔
76
            let edge = &builder_dag_edge.weight;
1,473✔
77
            let output_port = builder_dag_edge.weight.output_port;
1,473✔
78

×
79
            // Create or get record store.
×
80
            let (record_writer, record_reader) =
1,473✔
81
                match all_record_stores[source_node_index.index()].entry(output_port) {
1,473✔
82
                    Entry::Vacant(entry) => {
1,469✔
83
                        let record_store = create_record_store(
1,469✔
84
                            &builder_dag.graph()[source_node_index].storage.master_txn,
1,469✔
85
                            output_port,
1,469✔
86
                            edge.output_port_type,
1,469✔
87
                            edge.schema.clone(),
1,469✔
88
                            channel_buffer_sz + 1,
1,469✔
89
                        )?;
1,469✔
90
                        let (record_writer, record_reader) =
1,469✔
91
                            if let Some((record_writer, record_reader)) = record_store {
1,469✔
92
                                (
434✔
93
                                    Rc::new(RefCell::new(Some(record_writer))),
434✔
94
                                    Some(record_reader),
434✔
95
                                )
434✔
96
                            } else {
×
97
                                (Rc::new(RefCell::new(None)), None)
1,035✔
98
                            };
×
99
                        entry.insert((record_writer, record_reader)).clone()
1,469✔
100
                    }
×
101
                    Entry::Occupied(entry) => entry.get().clone(),
4✔
102
                };
×
103

×
104
            // Create channel.
×
105
            let (sender, receiver) = bounded(channel_buffer_sz);
1,473✔
106

1,473✔
107
            // Create edge.
1,473✔
108
            let edge = EdgeType {
1,473✔
109
                output_port,
1,473✔
110
                sender,
1,473✔
111
                record_writer,
1,473✔
112
                input_port: edge.input_port,
1,473✔
113
                receiver,
1,473✔
114
                record_reader,
1,473✔
115
            };
1,473✔
116
            edges.push(Some(edge));
1,473✔
117
        }
×
118

119
        // Create new graph.
120
        let graph = builder_dag.into_graph().map_owned(
332✔
121
            |_, node| Some(node),
1,695✔
122
            |edge_index, _| {
1,473✔
123
                edges[edge_index.index()]
1,473✔
124
                    .take()
1,473✔
125
                    .expect("We created all edges")
1,473✔
126
            },
1,473✔
127
        );
332✔
128
        Ok(ExecutionDag {
332✔
129
            graph,
332✔
130
            epoch_manager: Arc::new(EpochManager::new(num_sources)),
332✔
131
        })
332✔
132
    }
332✔
133

134
    pub fn graph(&self) -> &daggy::Dag<Option<NodeType>, EdgeType> {
2,027✔
135
        &self.graph
2,027✔
136
    }
2,027✔
137

×
138
    pub fn node_weight_mut(&mut self, node_index: daggy::NodeIndex) -> &mut Option<NodeType> {
1,695✔
139
        &mut self.graph[node_index]
1,695✔
140
    }
1,695✔
141

×
142
    pub fn epoch_manager(&self) -> &Arc<EpochManager> {
342✔
143
        &self.epoch_manager
342✔
144
    }
342✔
145

×
146
    #[allow(clippy::type_complexity)]
×
147
    pub fn collect_senders_and_record_writers(
1,359✔
148
        &mut self,
1,359✔
149
        node_index: daggy::NodeIndex,
1,359✔
150
    ) -> (
1,359✔
151
        HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
1,359✔
152
        HashMap<PortHandle, Box<dyn RecordWriter>>,
1,359✔
153
    ) {
1,359✔
154
        let edge_indexes = self
1,359✔
155
            .graph
1,359✔
156
            .edges(node_index)
1,359✔
157
            .map(|edge| edge.id())
1,473✔
158
            .collect::<Vec<_>>();
1,359✔
159

1,359✔
160
        let mut senders = HashMap::new();
1,359✔
161
        let mut record_writers = HashMap::new();
1,359✔
162
        for edge_index in edge_indexes {
2,832✔
163
            let edge = self
1,473✔
164
                .graph
1,473✔
165
                .edge_weight_mut(edge_index)
1,473✔
166
                .expect("We don't modify graph structure, only modify the edge weight");
1,473✔
167
            insert_vec_element(&mut senders, edge.output_port, edge.sender.clone());
1,473✔
168
            if let Entry::Vacant(entry) = record_writers.entry(edge.output_port) {
1,473✔
169
                // This interior mutability is to word around `Rc`. Other parts of this function is correctly marked `mut`.
×
170
                if let Some(record_writer) = edge.record_writer.borrow_mut().take() {
1,469✔
171
                    entry.insert(record_writer);
434✔
172
                }
1,035✔
173
            }
4✔
174
        }
×
175

×
176
        (senders, record_writers)
1,359✔
177
    }
1,359✔
178

×
179
    #[allow(clippy::type_complexity)]
×
180
    pub fn collect_receivers_and_record_readers(
1,353✔
181
        &mut self,
1,353✔
182
        node_index: daggy::NodeIndex,
1,353✔
183
    ) -> (
1,353✔
184
        Vec<PortHandle>,
1,353✔
185
        Vec<Receiver<ExecutorOperation>>,
1,353✔
186
        HashMap<PortHandle, Box<dyn RecordReader>>,
1,353✔
187
    ) {
1,353✔
188
        let edge_indexes = self
1,353✔
189
            .graph
1,353✔
190
            .edges_directed(node_index, Direction::Incoming)
1,353✔
191
            .map(|edge| edge.id())
1,473✔
192
            .collect::<Vec<_>>();
1,353✔
193

1,353✔
194
        let mut input_ports = Vec::new();
1,353✔
195
        let mut receivers = Vec::new();
1,353✔
196
        let mut record_readers = HashMap::new();
1,353✔
197
        for edge_index in edge_indexes {
2,826✔
198
            let edge = self
1,473✔
199
                .graph
1,473✔
200
                .edge_weight_mut(edge_index)
1,473✔
201
                .expect("We don't modify graph structure, only modify the edge weight");
1,473✔
202
            input_ports.push(edge.input_port);
1,473✔
203
            receivers.push(edge.receiver.clone());
1,473✔
204
            if let Some(record_reader) = edge.record_reader.take() {
1,473✔
205
                let insert_result = record_readers.insert(edge.input_port, record_reader);
438✔
206
                debug_assert!(
×
207
                    insert_result.is_none(),
438✔
208
                    "More than one output connect to a input port"
×
209
                );
×
210
            }
1,035✔
211
        }
×
212
        (input_ports, receivers, record_readers)
1,353✔
213
    }
1,353✔
214
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc