• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.07
/dozer-core/src/dag_impl.rs
1
use daggy::petgraph::visit::{Bfs, EdgeRef, IntoEdges};
2
use daggy::Walker;
3
use dozer_types::node::NodeHandle;
4

5
use crate::errors::ExecutionError;
6
use crate::node::{PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
7
use std::collections::{HashMap, HashSet};
8
use std::fmt::{Debug, Display};
9

10
pub const DEFAULT_PORT_HANDLE: u16 = 0xffff_u16;
11

12
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28✔
13
pub struct Endpoint {
×
14
    pub node: NodeHandle,
15
    pub port: PortHandle,
16
}
17

18
impl Endpoint {
19
    pub fn new(node: NodeHandle, port: PortHandle) -> Self {
6,305✔
20
        Self { node, port }
6,305✔
21
    }
6,305✔
22
}
×
23

24
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
21✔
25
pub struct Edge {
×
26
    pub from: Endpoint,
27
    pub to: Endpoint,
28
}
29

30
impl Edge {
31
    pub fn new(from: Endpoint, to: Endpoint) -> Self {
1,384✔
32
        Self { from, to }
1,384✔
33
    }
1,384✔
34
}
×
35

36
#[derive(Debug)]
×
37
/// A `SourceFactory`, `ProcessorFactory` or `SinkFactory`.
×
38
pub enum NodeKind<T> {
39
    Source(Box<dyn SourceFactory<T>>),
40
    Processor(Box<dyn ProcessorFactory<T>>),
41
    Sink(Box<dyn SinkFactory<T>>),
42
}
43

44
#[derive(Debug)]
×
45
/// The node type of the description DAG.
×
46
pub struct NodeType<T> {
47
    /// The node handle, unique across the DAG.
48
    pub handle: NodeHandle,
49
    /// The node kind.
50
    pub kind: NodeKind<T>,
51
}
52

53
impl<T> Display for NodeType<T> {
54
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
55
        write!(f, "{}", self.handle.id)
×
56
    }
×
57
}
×
58

59
#[derive(Debug, Clone, Copy)]
×
60
/// The edge type of the description DAG.
×
61
pub struct EdgeType {
62
    pub from: PortHandle,
63
    pub to: PortHandle,
64
}
65

66
impl EdgeType {
67
    pub fn new(from: PortHandle, to: PortHandle) -> Self {
1,778✔
68
        Self { from, to }
1,778✔
69
    }
1,778✔
70
}
×
71

72
pub trait EdgeHavePorts {
73
    fn output_port(&self) -> PortHandle;
×
74
    fn input_port(&self) -> PortHandle;
×
75
}
×
76

77
impl EdgeHavePorts for EdgeType {
78
    fn output_port(&self) -> PortHandle {
×
79
        self.from
×
80
    }
×
81

82
    fn input_port(&self) -> PortHandle {
×
83
        self.to
×
84
    }
×
85
}
86

87
#[derive(Debug)]
×
88
pub struct Dag<T> {
89
    /// The underlying graph.
×
90
    graph: daggy::Dag<NodeType<T>, EdgeType>,
×
91
    /// Map from node handle to node index.
×
92
    node_lookup_table: HashMap<NodeHandle, daggy::NodeIndex>,
93
    /// All edge indexes.
94
    edge_indexes: HashSet<EdgeIndex>,
95
}
×
96

×
97
impl<T> Default for Dag<T> {
×
98
    fn default() -> Self {
×
99
        Self::new()
×
100
    }
×
101
}
102

×
103
impl<T> Dag<T> {
×
104
    /// Creates an empty DAG.
×
105
    pub fn new() -> Self {
121✔
106
        Self {
121✔
107
            graph: daggy::Dag::new(),
121✔
108
            node_lookup_table: HashMap::new(),
121✔
109
            edge_indexes: HashSet::new(),
121✔
110
        }
121✔
111
    }
121✔
112

×
113
    /// Returns the underlying daggy graph.
×
114
    pub fn graph(&self) -> &daggy::Dag<NodeType<T>, EdgeType> {
980✔
115
        &self.graph
980✔
116
    }
980✔
117

×
118
    /// Returns the underlying daggy graph.
×
119
    pub fn into_graph(self) -> daggy::Dag<NodeType<T>, EdgeType> {
101✔
120
        self.graph
101✔
121
    }
101✔
122

×
123
    /// Adds a source. Panics if the `handle` exists in the `Dag`.
×
124
    pub fn add_source(
123✔
125
        &mut self,
123✔
126
        handle: NodeHandle,
123✔
127
        source: Box<dyn SourceFactory<T>>,
123✔
128
    ) -> daggy::NodeIndex {
123✔
129
        self.add_node(handle, NodeKind::Source(source))
123✔
130
    }
123✔
131

×
132
    /// Adds a processor. Panics if the `handle` exists in the `Dag`.
×
133
    pub fn add_processor(
268✔
134
        &mut self,
268✔
135
        handle: NodeHandle,
268✔
136
        processor: Box<dyn ProcessorFactory<T>>,
268✔
137
    ) -> daggy::NodeIndex {
268✔
138
        self.add_node(handle, NodeKind::Processor(processor))
268✔
139
    }
268✔
140

×
141
    /// Adds a sink. Panics if the `handle` exists in the `Dag`.
×
142
    pub fn add_sink(
113✔
143
        &mut self,
113✔
144
        handle: NodeHandle,
113✔
145
        sink: Box<dyn SinkFactory<T>>,
113✔
146
    ) -> daggy::NodeIndex {
113✔
147
        self.add_node(handle, NodeKind::Sink(sink))
113✔
148
    }
113✔
149

×
150
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
×
151
    ///
152
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
153
    pub fn connect(&mut self, from: Endpoint, to: Endpoint) -> Result<(), ExecutionError> {
425✔
154
        let from_node_index = validate_endpoint(self, &from, PortDirection::Output)?;
425✔
155
        let to_node_index = validate_endpoint(self, &to, PortDirection::Input)?;
422✔
156
        self.connect_with_index(from_node_index, from.port, to_node_index, to.port)
421✔
157
    }
425✔
158

×
159
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
×
160
    ///
161
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
162
    pub fn connect_with_index(
436✔
163
        &mut self,
436✔
164
        from_node_index: daggy::NodeIndex,
436✔
165
        output_port: PortHandle,
436✔
166
        to_node_index: daggy::NodeIndex,
436✔
167
        input_port: PortHandle,
436✔
168
    ) -> Result<(), ExecutionError> {
436✔
169
        validate_port_with_index(self, from_node_index, output_port, PortDirection::Output)?;
436✔
170
        validate_port_with_index(self, to_node_index, input_port, PortDirection::Input)?;
436✔
171
        let edge_index = self.graph.add_edge(
436✔
172
            from_node_index,
436✔
173
            to_node_index,
436✔
174
            EdgeType::new(output_port, input_port),
436✔
175
        )?;
436✔
176

×
177
        if !self.edge_indexes.insert(EdgeIndex {
436✔
178
            from_node: from_node_index,
436✔
179
            output_port,
436✔
180
            to_node: to_node_index,
436✔
181
            input_port,
436✔
182
        }) {
436✔
183
            panic!("An edge {edge_index:?} has already been inserted using specified edge handle");
×
184
        }
436✔
185

436✔
186
        Ok(())
436✔
187
    }
436✔
188

×
189
    /// Adds another whole `Dag` to `self`. Optionally under a namespace `ns`.
×
190
    pub fn merge(&mut self, ns: Option<u16>, other: Dag<T>) {
4✔
191
        let (other_nodes, _) = other.graph.into_graph().into_nodes_edges();
4✔
192

4✔
193
        // Insert nodes.
4✔
194
        let mut other_node_index_to_self_node_index = vec![];
4✔
195
        for other_node in other_nodes.into_iter() {
8✔
196
            let other_node = other_node.weight;
8✔
197
            let self_node_handle =
8✔
198
                NodeHandle::new(ns.or(other_node.handle.ns), other_node.handle.id.clone());
8✔
199
            let self_node_index = self.add_node(self_node_handle.clone(), other_node.kind);
8✔
200
            other_node_index_to_self_node_index.push(self_node_index);
8✔
201
        }
8✔
202

×
203
        // Insert edges.
×
204
        for other_edge_index in other.edge_indexes.into_iter() {
4✔
205
            let self_from_node =
4✔
206
                other_node_index_to_self_node_index[other_edge_index.from_node.index()];
4✔
207
            let self_to_node =
4✔
208
                other_node_index_to_self_node_index[other_edge_index.to_node.index()];
4✔
209
            self.connect_with_index(
4✔
210
                self_from_node,
4✔
211
                other_edge_index.output_port,
4✔
212
                self_to_node,
4✔
213
                other_edge_index.input_port,
4✔
214
            )
4✔
215
            .expect("BUG in DAG");
4✔
216
        }
4✔
217
    }
4✔
218

×
219
    /// Returns an iterator over all node handles.
×
220
    pub fn node_handles(&self) -> impl Iterator<Item = &NodeHandle> {
×
221
        self.nodes().map(|node| &node.handle)
×
222
    }
×
223

×
224
    /// Returns an iterator over all nodes.
×
225
    pub fn nodes(&self) -> impl Iterator<Item = &NodeType<T>> {
×
226
        self.graph.raw_nodes().iter().map(|node| &node.weight)
×
227
    }
×
228

×
229
    /// Returns an iterator over source handles and sources.
×
230
    pub fn sources(&self) -> impl Iterator<Item = (&NodeHandle, &dyn SourceFactory<T>)> {
×
231
        self.nodes().flat_map(|node| {
×
232
            if let NodeKind::Source(source) = &node.kind {
×
233
                Some((&node.handle, &**source))
×
234
            } else {
×
235
                None
×
236
            }
237
        })
×
238
    }
×
239

×
240
    /// Returns an iterator over processor handles and processors.
×
241
    pub fn processors(&self) -> impl Iterator<Item = (&NodeHandle, &dyn ProcessorFactory<T>)> {
×
242
        self.nodes().flat_map(|node| {
×
243
            if let NodeKind::Processor(processor) = &node.kind {
×
244
                Some((&node.handle, &**processor))
×
245
            } else {
×
246
                None
×
247
            }
248
        })
×
249
    }
×
250

×
251
    /// Returns an iterator over sink handles and sinks.
×
252
    pub fn sinks(&self) -> impl Iterator<Item = (&NodeHandle, &dyn SinkFactory<T>)> {
×
253
        self.nodes().flat_map(|node| {
×
254
            if let NodeKind::Sink(sink) = &node.kind {
×
255
                Some((&node.handle, &**sink))
×
256
            } else {
×
257
                None
×
258
            }
259
        })
×
260
    }
×
261

×
262
    /// Returns an iterator over all edge handles.
×
263
    pub fn edge_handles(&self) -> Vec<Edge> {
1✔
264
        let get_endpoint = |node_index: daggy::NodeIndex, port_handle| {
12✔
265
            let node = &self.graph[node_index];
12✔
266
            Endpoint {
12✔
267
                node: node.handle.clone(),
12✔
268
                port: port_handle,
12✔
269
            }
12✔
270
        };
12✔
271

×
272
        self.edge_indexes
1✔
273
            .iter()
1✔
274
            .map(|edge_index| {
6✔
275
                Edge::new(
6✔
276
                    get_endpoint(edge_index.from_node, edge_index.output_port),
6✔
277
                    get_endpoint(edge_index.to_node, edge_index.input_port),
6✔
278
                )
6✔
279
            })
6✔
280
            .collect()
1✔
281
    }
1✔
282

×
283
    /// Finds the node by its handle.
×
284
    pub fn node_kind_from_handle(&self, handle: &NodeHandle) -> &NodeKind<T> {
×
285
        &self.graph[self.node_index(handle)].kind
×
286
    }
×
287

×
288
    /// Returns an iterator over node handles that are connected to the given node handle.
×
289
    pub fn edges_from_handle(&self, handle: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
290
        let node_index = self.node_index(handle);
×
291
        self.graph
×
292
            .edges(node_index)
×
293
            .map(|edge| &self.graph[edge.target()].handle)
×
294
    }
×
295

×
296
    /// Returns an iterator over endpoints that are connected to the given endpoint.
×
297
    pub fn edges_from_endpoint<'a>(
×
298
        &'a self,
×
299
        node_handle: &'a NodeHandle,
×
300
        port_handle: PortHandle,
×
301
    ) -> impl Iterator<Item = (&NodeHandle, PortHandle)> {
×
302
        self.graph
×
303
            .edges(self.node_index(node_handle))
×
304
            .filter(move |edge| edge.weight().from == port_handle)
×
305
            .map(|edge| (&self.graph[edge.target()].handle, edge.weight().to))
×
306
    }
×
307

×
308
    /// Returns an iterator over all node handles reachable from `start` in a breadth-first search.
×
309
    pub fn bfs(&self, start: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
310
        let start = self.node_index(start);
×
311

×
312
        Bfs::new(self.graph.graph(), start)
×
313
            .iter(self.graph.graph())
×
314
            .map(|node_index| &self.graph[node_index].handle)
×
315
    }
×
316
}
×
317

×
318
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
657✔
319
struct EdgeIndex {
320
    from_node: daggy::NodeIndex,
×
321
    output_port: PortHandle,
322
    to_node: daggy::NodeIndex,
323
    input_port: PortHandle,
324
}
325

326
impl<T> Dag<T> {
327
    fn add_node(&mut self, handle: NodeHandle, kind: NodeKind<T>) -> daggy::NodeIndex {
512✔
328
        let node_index = self.graph.add_node(NodeType {
512✔
329
            handle: handle.clone(),
512✔
330
            kind,
512✔
331
        });
512✔
332
        if let Some(node_index) = self.node_lookup_table.insert(handle, node_index) {
512✔
333
            panic!("A node {node_index:?} has already been inserted using specified node handle");
×
334
        }
512✔
335
        node_index
512✔
336
    }
512✔
337

×
338
    fn node_index(&self, node_handle: &NodeHandle) -> daggy::NodeIndex {
847✔
339
        *self
847✔
340
            .node_lookup_table
847✔
341
            .get(node_handle)
847✔
342
            .unwrap_or_else(|| panic!("Node handle {node_handle:?} not found in dag"))
847✔
343
    }
847✔
344
}
×
345

×
346
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
1,719✔
347
enum PortDirection {
348
    Input,
×
349
    Output,
350
}
351

352
fn validate_endpoint<T>(
847✔
353
    dag: &Dag<T>,
847✔
354
    endpoint: &Endpoint,
847✔
355
    direction: PortDirection,
847✔
356
) -> Result<daggy::NodeIndex, ExecutionError> {
847✔
357
    let node_index = dag.node_index(&endpoint.node);
847✔
358
    validate_port_with_index(dag, node_index, endpoint.port, direction)?;
847✔
359
    Ok(node_index)
843✔
360
}
847✔
361

×
362
fn validate_port_with_index<T>(
1,719✔
363
    dag: &Dag<T>,
1,719✔
364
    node_index: daggy::NodeIndex,
1,719✔
365
    port: PortHandle,
1,719✔
366
    direction: PortDirection,
1,719✔
367
) -> Result<(), ExecutionError> {
1,719✔
368
    let node = &dag.graph[node_index];
1,719✔
369
    if !contains_port(&node.kind, direction, port)? {
1,719✔
370
        return Err(ExecutionError::InvalidPortHandle(port));
4✔
371
    }
1,715✔
372
    Ok(())
1,715✔
373
}
1,719✔
374

×
375
fn contains_port<T>(
1,719✔
376
    node: &NodeKind<T>,
1,719✔
377
    direction: PortDirection,
1,719✔
378
    port: PortHandle,
1,719✔
379
) -> Result<bool, ExecutionError> {
1,719✔
380
    Ok(match node {
1,719✔
381
        NodeKind::Processor(p) => {
1,159✔
382
            if direction == PortDirection::Output {
1,159✔
383
                p.get_output_ports().iter().any(|e| e.handle == port)
522✔
384
            } else {
×
385
                p.get_input_ports().contains(&port)
637✔
386
            }
387
        }
×
388
        NodeKind::Sink(s) => {
221✔
389
            if direction == PortDirection::Output {
221✔
390
                false
×
391
            } else {
×
392
                s.get_input_ports().contains(&port)
221✔
393
            }
394
        }
×
395
        NodeKind::Source(s) => {
339✔
396
            if direction == PortDirection::Output {
339✔
397
                s.get_output_ports().iter().any(|e| e.handle == port)
451✔
398
            } else {
×
399
                false
×
400
            }
401
        }
×
402
    })
403
}
1,719✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc