• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.03
/dozer-core/src/executor/execution_dag.rs
1
use std::{
2
    borrow::BorrowMut,
3
    cell::RefCell,
4
    collections::{hash_map::Entry, HashMap},
5
    fmt::Debug,
6
    rc::Rc,
7
    sync::Arc,
8
};
9

10
use crate::{
11
    builder_dag::{BuilderDag, NodeKind, NodeType},
12
    epoch::EpochManager,
13
    error_manager::ErrorManager,
14
    errors::ExecutionError,
15
    executor_operation::ExecutorOperation,
16
    hash_map_to_vec::insert_vec_element,
17
    node::PortHandle,
18
    processor_record::ProcessorRecordStore,
19
    record_store::{create_record_writer, RecordWriter},
20
};
21
use crossbeam::channel::{bounded, Receiver, Sender};
22
use daggy::petgraph::{
23
    visit::{EdgeRef, IntoEdges, IntoEdgesDirected, IntoNodeIdentifiers},
24
    Direction,
25
};
26

27
pub type SharedRecordWriter = Rc<RefCell<Option<Box<dyn RecordWriter>>>>;
28

29
#[derive(Debug, Clone)]
×
30
pub struct EdgeType {
31
    /// Output port handle.
32
    pub output_port: PortHandle,
33
    /// The sender for data flowing downstream.
34
    pub sender: Sender<ExecutorOperation>,
35
    /// The record writer for persisting data for downstream queries, if persistency is needed. Different edges with the same output port share the same record writer.
36
    pub record_writer: SharedRecordWriter,
37
    /// Input port handle.
38
    pub input_port: PortHandle,
39
    /// The receiver from receiving data from upstream.
40
    pub receiver: Receiver<ExecutorOperation>,
41
}
42

43
#[derive(Debug)]
×
44
pub struct ExecutionDag {
45
    /// Nodes will be moved into execution threads.
46
    graph: daggy::Dag<Option<NodeType>, EdgeType>,
47
    record_store: Arc<ProcessorRecordStore>,
48
    epoch_manager: Arc<EpochManager>,
49
    error_manager: Arc<ErrorManager>,
50
}
51

52
impl ExecutionDag {
×
53
    pub fn new(
383✔
54
        builder_dag: BuilderDag,
383✔
55
        channel_buffer_sz: usize,
383✔
56
        error_threshold: Option<u32>,
383✔
57
    ) -> Result<Self, ExecutionError> {
383✔
58
        // Count number of sources.
383✔
59
        let num_sources = builder_dag
383✔
60
            .graph()
383✔
61
            .node_identifiers()
383✔
62
            .filter(|node_index| {
1,886✔
63
                matches!(
1,496✔
64
                    builder_dag.graph()[*node_index].kind,
1,886✔
65
                    NodeKind::Source(_, _)
66
                )
×
67
            })
1,886✔
68
            .count();
383✔
69

383✔
70
        // We only create record stored once for every output port. Every `HashMap` in this `Vec` tracks if a node's output ports already have the record store created.
383✔
71
        let mut all_record_writers = vec![
383✔
72
            HashMap::<PortHandle, SharedRecordWriter>::new();
383✔
73
            builder_dag.graph().node_count()
383✔
74
        ];
383✔
75

383✔
76
        // Create new edges.
383✔
77
        let mut edges = vec![];
383✔
78
        for builder_dag_edge in builder_dag.graph().raw_edges().iter() {
1,695✔
79
            let source_node_index = builder_dag_edge.source();
1,695✔
80
            let edge = &builder_dag_edge.weight;
1,695✔
81
            let output_port = builder_dag_edge.weight.output_port;
1,695✔
82

83
            // Create or get record store.
84
            let record_writer =
1,695✔
85
                match all_record_writers[source_node_index.index()].entry(output_port) {
1,695✔
86
                    Entry::Vacant(entry) => {
1,681✔
87
                        let record_writer = create_record_writer(
1,681✔
88
                            output_port,
1,681✔
89
                            edge.output_port_type,
1,681✔
90
                            edge.schema.clone(),
1,681✔
91
                            builder_dag.record_store().clone(),
1,681✔
92
                        );
1,681✔
93
                        let record_writer = if let Some(record_writer) = record_writer {
1,681✔
94
                            Rc::new(RefCell::new(Some(record_writer)))
530✔
95
                        } else {
96
                            Rc::new(RefCell::new(None))
1,151✔
97
                        };
98
                        entry.insert(record_writer).clone()
1,681✔
99
                    }
100
                    Entry::Occupied(entry) => entry.get().clone(),
14✔
101
                };
102

103
            // Create channel.
104
            let (sender, receiver) = bounded(channel_buffer_sz);
1,695✔
105

1,695✔
106
            // Create edge.
1,695✔
107
            let edge = EdgeType {
1,695✔
108
                output_port,
1,695✔
109
                sender,
1,695✔
110
                record_writer,
1,695✔
111
                input_port: edge.input_port,
1,695✔
112
                receiver,
1,695✔
113
            };
1,695✔
114
            edges.push(Some(edge));
1,695✔
115
        }
116

117
        // Create new graph.
118
        let record_store = builder_dag.record_store().clone();
383✔
119
        let epoch_manager = Arc::new(EpochManager::new(
383✔
120
            num_sources,
383✔
121
            record_store.clone(),
383✔
122
            Default::default(),
383✔
123
        ));
383✔
124
        let graph = builder_dag.into_graph().map_owned(
383✔
125
            |_, node| Some(node),
1,886✔
126
            |edge_index, _| {
1,695✔
127
                edges[edge_index.index()]
1,695✔
128
                    .take()
1,695✔
129
                    .expect("We created all edges")
1,695✔
130
            },
1,695✔
131
        );
383✔
132
        Ok(ExecutionDag {
383✔
133
            graph,
383✔
134
            record_store,
383✔
135
            epoch_manager,
383✔
136
            error_manager: Arc::new(if let Some(threshold) = error_threshold {
383✔
137
                ErrorManager::new_threshold(threshold)
383✔
138
            } else {
139
                ErrorManager::new_unlimited()
×
140
            }),
×
141
        })
142
    }
383✔
143

×
144
    pub fn graph(&self) -> &daggy::Dag<Option<NodeType>, EdgeType> {
2,269✔
145
        &self.graph
2,269✔
146
    }
2,269✔
147

×
148
    pub fn node_weight_mut(&mut self, node_index: daggy::NodeIndex) -> &mut Option<NodeType> {
1,886✔
149
        &mut self.graph[node_index]
1,886✔
150
    }
1,886✔
151

×
152
    pub fn record_store(&self) -> &Arc<ProcessorRecordStore> {
1,109✔
153
        &self.record_store
1,109✔
154
    }
1,109✔
155

×
156
    pub fn epoch_manager(&self) -> &Arc<EpochManager> {
777✔
157
        &self.epoch_manager
777✔
158
    }
777✔
159

×
160
    pub fn error_manager(&self) -> &Arc<ErrorManager> {
2,995✔
161
        &self.error_manager
2,995✔
162
    }
2,995✔
163

×
164
    #[allow(clippy::type_complexity)]
×
165
    pub fn collect_senders_and_record_writers(
1,499✔
166
        &mut self,
1,499✔
167
        node_index: daggy::NodeIndex,
1,499✔
168
    ) -> (
1,499✔
169
        HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
1,499✔
170
        HashMap<PortHandle, Box<dyn RecordWriter>>,
1,499✔
171
    ) {
1,499✔
172
        let edge_indexes = self
1,499✔
173
            .graph
1,499✔
174
            .edges(node_index)
1,499✔
175
            .map(|edge| edge.id())
1,695✔
176
            .collect::<Vec<_>>();
1,499✔
177

1,499✔
178
        let mut senders = HashMap::new();
1,499✔
179
        let mut record_writers = HashMap::new();
1,499✔
180
        for edge_index in edge_indexes {
3,194✔
181
            let edge = self
1,695✔
182
                .graph
1,695✔
183
                .edge_weight_mut(edge_index)
1,695✔
184
                .expect("We don't modify graph structure, only modify the edge weight");
1,695✔
185
            insert_vec_element(&mut senders, edge.output_port, edge.sender.clone());
1,695✔
186
            if let Entry::Vacant(entry) = record_writers.entry(edge.output_port) {
1,695✔
187
                // This interior mutability is to word around `Rc`. Other parts of this function is correctly marked `mut`.
×
188
                if let Some(record_writer) = edge.record_writer.borrow_mut().take() {
1,686✔
189
                    entry.insert(record_writer);
530✔
190
                }
1,156✔
191
            }
9✔
192
        }
×
193

×
194
        (senders, record_writers)
1,499✔
195
    }
1,499✔
196

×
197
    pub fn collect_receivers(
1,496✔
198
        &mut self,
1,496✔
199
        node_index: daggy::NodeIndex,
1,496✔
200
    ) -> (Vec<PortHandle>, Vec<Receiver<ExecutorOperation>>) {
1,496✔
201
        let edge_indexes = self
1,496✔
202
            .graph
1,496✔
203
            .edges_directed(node_index, Direction::Incoming)
1,496✔
204
            .map(|edge| edge.id())
1,695✔
205
            .collect::<Vec<_>>();
1,496✔
206

1,496✔
207
        let mut input_ports = Vec::new();
1,496✔
208
        let mut receivers = Vec::new();
1,496✔
209
        for edge_index in edge_indexes {
3,191✔
210
            let edge = self
1,695✔
211
                .graph
1,695✔
212
                .edge_weight_mut(edge_index)
1,695✔
213
                .expect("We don't modify graph structure, only modify the edge weight");
1,695✔
214
            input_ports.push(edge.input_port);
1,695✔
215
            receivers.push(edge.receiver.clone());
1,695✔
216
        }
1,695✔
217
        (input_ports, receivers)
1,496✔
218
    }
1,496✔
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc