• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6009657516

29 Aug 2023 08:13AM UTC coverage: 76.652% (-1.4%) from 78.07%
6009657516

push

github

web-flow
chore: Create unit tests workflow (#1910)

* chore: Update for Rust 1.72.0

Rust 1.72.0 has introduced a bunch of new lints. Here we fix them all.

`let ... else` finally gets formatted.

* chire: Create unit tests workflow

* Rename and remove useless steps

* remove env vars

* Add concurrency group

* Test unit workflow on 4 cores

* Add mysql service to unit tests

---------

Co-authored-by: chubei <914745487@qq.com>

48982 of 63902 relevant lines covered (76.65%)

48394.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.29
/dozer-core/src/dag_impl.rs
1
use daggy::petgraph::visit::{Bfs, EdgeRef, IntoEdges};
2
use daggy::Walker;
3
use dozer_types::node::NodeHandle;
4

5
use crate::errors::ExecutionError;
6
use crate::node::{PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
7
use std::collections::{HashMap, HashSet};
8
use std::fmt::{Debug, Display};
9

10
pub const DEFAULT_PORT_HANDLE: u16 = 0xffff_u16;
11

12
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
28✔
13
pub struct Endpoint {
14
    pub node: NodeHandle,
15
    pub port: PortHandle,
16
}
17

18
impl Endpoint {
19
    pub fn new(node: NodeHandle, port: PortHandle) -> Self {
10,079✔
20
        Self { node, port }
10,079✔
21
    }
10,079✔
22
}
23

24
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
21✔
25
pub struct Edge {
26
    pub from: Endpoint,
27
    pub to: Endpoint,
28
}
29

30
impl Edge {
31
    pub fn new(from: Endpoint, to: Endpoint) -> Self {
2,197✔
32
        Self { from, to }
2,197✔
33
    }
2,197✔
34
}
35

36
#[derive(Debug)]
×
37
/// A `SourceFactory`, `ProcessorFactory` or `SinkFactory`.
38
pub enum NodeKind {
39
    Source(Box<dyn SourceFactory>),
40
    Processor(Box<dyn ProcessorFactory>),
41
    Sink(Box<dyn SinkFactory>),
42
}
43

44
#[derive(Debug)]
×
45
/// The node type of the description DAG.
46
pub struct NodeType {
47
    /// The node handle, unique across the DAG.
48
    pub handle: NodeHandle,
49
    /// The node kind.
50
    pub kind: NodeKind,
51
}
52

53
impl Display for NodeType {
54
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
55
        write!(f, "{}", self.handle.id)
×
56
    }
×
57
}
58

59
#[derive(Debug, Clone, Copy)]
×
60
/// The edge type of the description DAG.
61
pub struct EdgeType {
62
    pub from: PortHandle,
63
    pub to: PortHandle,
64
}
65

66
impl EdgeType {
67
    pub fn new(from: PortHandle, to: PortHandle) -> Self {
2,849✔
68
        Self { from, to }
2,849✔
69
    }
2,849✔
70
}
71

72
pub trait EdgeHavePorts {
73
    fn output_port(&self) -> PortHandle;
74
    fn input_port(&self) -> PortHandle;
75
}
76

77
impl EdgeHavePorts for EdgeType {
78
    fn output_port(&self) -> PortHandle {
×
79
        self.from
×
80
    }
×
81

82
    fn input_port(&self) -> PortHandle {
×
83
        self.to
×
84
    }
×
85
}
86

87
#[derive(Debug)]
×
88
pub struct Dag {
89
    /// The underlying graph.
90
    graph: daggy::Dag<NodeType, EdgeType>,
91
    /// Map from node handle to node index.
92
    node_lookup_table: HashMap<NodeHandle, daggy::NodeIndex>,
93
    /// All edge indexes.
94
    edge_indexes: HashSet<EdgeIndex>,
95
}
96

97
impl Default for Dag {
98
    fn default() -> Self {
×
99
        Self::new()
×
100
    }
×
101
}
102

103
impl Dag {
104
    /// Creates an empty DAG.
105
    pub fn new() -> Self {
702✔
106
        Self {
702✔
107
            graph: daggy::Dag::new(),
702✔
108
            node_lookup_table: HashMap::new(),
702✔
109
            edge_indexes: HashSet::new(),
702✔
110
        }
702✔
111
    }
702✔
112

113
    /// Returns the underlying daggy graph.
114
    pub fn graph(&self) -> &daggy::Dag<NodeType, EdgeType> {
6,490✔
115
        &self.graph
6,490✔
116
    }
6,490✔
117

118
    /// Returns the underlying daggy graph.
119
    pub fn into_graph(self) -> daggy::Dag<NodeType, EdgeType> {
682✔
120
        self.graph
682✔
121
    }
682✔
122

123
    /// Adds a source. Panics if the `handle` exists in the `Dag`.
124
    pub fn add_source(
704✔
125
        &mut self,
704✔
126
        handle: NodeHandle,
704✔
127
        source: Box<dyn SourceFactory>,
704✔
128
    ) -> daggy::NodeIndex {
704✔
129
        self.add_node(handle, NodeKind::Source(source))
704✔
130
    }
704✔
131

132
    /// Adds a processor. Panics if the `handle` exists in the `Dag`.
133
    pub fn add_processor(
1,819✔
134
        &mut self,
1,819✔
135
        handle: NodeHandle,
1,819✔
136
        processor: Box<dyn ProcessorFactory>,
1,819✔
137
    ) -> daggy::NodeIndex {
1,819✔
138
        self.add_node(handle, NodeKind::Processor(processor))
1,819✔
139
    }
1,819✔
140

141
    /// Adds a sink. Panics if the `handle` exists in the `Dag`.
142
    pub fn add_sink(&mut self, handle: NodeHandle, sink: Box<dyn SinkFactory>) -> daggy::NodeIndex {
694✔
143
        self.add_node(handle, NodeKind::Sink(sink))
694✔
144
    }
694✔
145

×
146
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
×
147
    ///
×
148
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
×
149
    pub fn connect(&mut self, from: Endpoint, to: Endpoint) -> Result<(), ExecutionError> {
2,838✔
150
        let from_node_index = validate_endpoint(self, &from, PortDirection::Output)?;
2,838✔
151
        let to_node_index = validate_endpoint(self, &to, PortDirection::Input)?;
2,835✔
152
        self.connect_with_index(from_node_index, from.port, to_node_index, to.port)
2,834✔
153
    }
2,838✔
154

×
155
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
×
156
    ///
×
157
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
×
158
    pub fn connect_with_index(
2,849✔
159
        &mut self,
2,849✔
160
        from_node_index: daggy::NodeIndex,
2,849✔
161
        output_port: PortHandle,
2,849✔
162
        to_node_index: daggy::NodeIndex,
2,849✔
163
        input_port: PortHandle,
2,849✔
164
    ) -> Result<(), ExecutionError> {
2,849✔
165
        validate_port_with_index(self, from_node_index, output_port, PortDirection::Output)?;
2,849✔
166
        validate_port_with_index(self, to_node_index, input_port, PortDirection::Input)?;
2,849✔
167
        let edge_index = self.graph.add_edge(
2,849✔
168
            from_node_index,
2,849✔
169
            to_node_index,
2,849✔
170
            EdgeType::new(output_port, input_port),
2,849✔
171
        )?;
2,849✔
172

×
173
        if !self.edge_indexes.insert(EdgeIndex {
2,849✔
174
            from_node: from_node_index,
2,849✔
175
            output_port,
2,849✔
176
            to_node: to_node_index,
2,849✔
177
            input_port,
2,849✔
178
        }) {
2,849✔
179
            panic!("An edge {edge_index:?} has already been inserted using specified edge handle");
×
180
        }
2,849✔
181

2,849✔
182
        Ok(())
2,849✔
183
    }
2,849✔
184

×
185
    /// Adds another whole `Dag` to `self`. Optionally under a namespace `ns`.
×
186
    pub fn merge(&mut self, ns: Option<u16>, other: Dag) {
4✔
187
        let (other_nodes, _) = other.graph.into_graph().into_nodes_edges();
4✔
188

4✔
189
        // Insert nodes.
4✔
190
        let mut other_node_index_to_self_node_index = vec![];
4✔
191
        for other_node in other_nodes.into_iter() {
8✔
192
            let other_node = other_node.weight;
8✔
193
            let self_node_handle =
8✔
194
                NodeHandle::new(ns.or(other_node.handle.ns), other_node.handle.id.clone());
8✔
195
            let self_node_index = self.add_node(self_node_handle.clone(), other_node.kind);
8✔
196
            other_node_index_to_self_node_index.push(self_node_index);
8✔
197
        }
8✔
198

×
199
        // Insert edges.
×
200
        for other_edge_index in other.edge_indexes.into_iter() {
4✔
201
            let self_from_node =
4✔
202
                other_node_index_to_self_node_index[other_edge_index.from_node.index()];
4✔
203
            let self_to_node =
4✔
204
                other_node_index_to_self_node_index[other_edge_index.to_node.index()];
4✔
205
            self.connect_with_index(
4✔
206
                self_from_node,
4✔
207
                other_edge_index.output_port,
4✔
208
                self_to_node,
4✔
209
                other_edge_index.input_port,
4✔
210
            )
4✔
211
            .expect("BUG in DAG");
4✔
212
        }
4✔
213
    }
4✔
214

×
215
    /// Returns an iterator over all node handles.
×
216
    pub fn node_handles(&self) -> impl Iterator<Item = &NodeHandle> {
×
217
        self.nodes().map(|node| &node.handle)
×
218
    }
×
219

220
    /// Returns an iterator over all nodes.
×
221
    pub fn nodes(&self) -> impl Iterator<Item = &NodeType> {
×
222
        self.graph.raw_nodes().iter().map(|node| &node.weight)
×
223
    }
×
224

225
    /// Returns an iterator over source handles and sources.
×
226
    pub fn sources(&self) -> impl Iterator<Item = (&NodeHandle, &dyn SourceFactory)> {
×
227
        self.nodes().flat_map(|node| {
×
228
            if let NodeKind::Source(source) = &node.kind {
×
229
                Some((&node.handle, &**source))
×
230
            } else {
×
231
                None
×
232
            }
×
233
        })
×
234
    }
×
235

×
236
    /// Returns an iterator over processor handles and processors.
237
    pub fn processors(&self) -> impl Iterator<Item = (&NodeHandle, &dyn ProcessorFactory)> {
×
238
        self.nodes().flat_map(|node| {
×
239
            if let NodeKind::Processor(processor) = &node.kind {
×
240
                Some((&node.handle, &**processor))
×
241
            } else {
×
242
                None
×
243
            }
×
244
        })
×
245
    }
×
246

×
247
    /// Returns an iterator over sink handles and sinks.
248
    pub fn sinks(&self) -> impl Iterator<Item = (&NodeHandle, &dyn SinkFactory)> {
×
249
        self.nodes().flat_map(|node| {
×
250
            if let NodeKind::Sink(sink) = &node.kind {
×
251
                Some((&node.handle, &**sink))
×
252
            } else {
×
253
                None
×
254
            }
×
255
        })
×
256
    }
×
257

×
258
    /// Returns an iterator over all edge handles.
259
    pub fn edge_handles(&self) -> Vec<Edge> {
1✔
260
        let get_endpoint = |node_index: daggy::NodeIndex, port_handle| {
12✔
261
            let node = &self.graph[node_index];
12✔
262
            Endpoint {
12✔
263
                node: node.handle.clone(),
12✔
264
                port: port_handle,
12✔
265
            }
12✔
266
        };
12✔
267

×
268
        self.edge_indexes
1✔
269
            .iter()
1✔
270
            .map(|edge_index| {
6✔
271
                Edge::new(
6✔
272
                    get_endpoint(edge_index.from_node, edge_index.output_port),
6✔
273
                    get_endpoint(edge_index.to_node, edge_index.input_port),
6✔
274
                )
6✔
275
            })
6✔
276
            .collect()
1✔
277
    }
1✔
278

×
279
    /// Finds the node by its handle.
×
280
    pub fn node_kind_from_handle(&self, handle: &NodeHandle) -> &NodeKind {
×
281
        &self.graph[self.node_index(handle)].kind
×
282
    }
×
283

284
    /// Returns an iterator over node handles that are connected to the given node handle.
×
285
    pub fn edges_from_handle(&self, handle: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
286
        let node_index = self.node_index(handle);
×
287
        self.graph
×
288
            .edges(node_index)
×
289
            .map(|edge| &self.graph[edge.target()].handle)
×
290
    }
×
291

×
292
    /// Returns an iterator over endpoints that are connected to the given endpoint.
×
293
    pub fn edges_from_endpoint<'a>(
×
294
        &'a self,
×
295
        node_handle: &'a NodeHandle,
×
296
        port_handle: PortHandle,
×
297
    ) -> impl Iterator<Item = (&NodeHandle, PortHandle)> {
×
298
        self.graph
×
299
            .edges(self.node_index(node_handle))
×
300
            .filter(move |edge| edge.weight().from == port_handle)
×
301
            .map(|edge| (&self.graph[edge.target()].handle, edge.weight().to))
×
302
    }
×
303

×
304
    /// Returns an iterator over all node handles reachable from `start` in a breadth-first search.
×
305
    pub fn bfs(&self, start: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
306
        let start = self.node_index(start);
×
307

×
308
        Bfs::new(self.graph.graph(), start)
×
309
            .iter(self.graph.graph())
×
310
            .map(|node_index| &self.graph[node_index].handle)
×
311
    }
×
312
}
×
313

×
314
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
4,430✔
315
struct EdgeIndex {
×
316
    from_node: daggy::NodeIndex,
317
    output_port: PortHandle,
318
    to_node: daggy::NodeIndex,
×
319
    input_port: PortHandle,
320
}
321

322
impl Dag {
323
    fn add_node(&mut self, handle: NodeHandle, kind: NodeKind) -> daggy::NodeIndex {
3,225✔
324
        let node_index = self.graph.add_node(NodeType {
3,225✔
325
            handle: handle.clone(),
3,225✔
326
            kind,
3,225✔
327
        });
3,225✔
328
        if let Some(node_index) = self.node_lookup_table.insert(handle, node_index) {
3,225✔
329
            panic!("A node {node_index:?} has already been inserted using specified node handle");
×
330
        }
3,225✔
331
        node_index
3,225✔
332
    }
3,225✔
333

×
334
    fn node_index(&self, node_handle: &NodeHandle) -> daggy::NodeIndex {
5,673✔
335
        *self
5,673✔
336
            .node_lookup_table
5,673✔
337
            .get(node_handle)
5,673✔
338
            .unwrap_or_else(|| panic!("Node handle {node_handle:?} not found in dag"))
5,673✔
339
    }
5,673✔
340
}
×
341

×
342
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
11,371✔
343
enum PortDirection {
×
344
    Input,
345
    Output,
346
}
×
347

348
fn validate_endpoint(
5,673✔
349
    dag: &Dag,
5,673✔
350
    endpoint: &Endpoint,
5,673✔
351
    direction: PortDirection,
5,673✔
352
) -> Result<daggy::NodeIndex, ExecutionError> {
5,673✔
353
    let node_index = dag.node_index(&endpoint.node);
5,673✔
354
    validate_port_with_index(dag, node_index, endpoint.port, direction)?;
5,673✔
355
    Ok(node_index)
5,669✔
356
}
5,673✔
357

×
358
fn validate_port_with_index(
11,371✔
359
    dag: &Dag,
11,371✔
360
    node_index: daggy::NodeIndex,
11,371✔
361
    port: PortHandle,
11,371✔
362
    direction: PortDirection,
11,371✔
363
) -> Result<(), ExecutionError> {
11,371✔
364
    let node = &dag.graph[node_index];
11,371✔
365
    if !contains_port(&node.kind, direction, port)? {
11,371✔
366
        return Err(ExecutionError::InvalidPortHandle(port));
4✔
367
    }
11,367✔
368
    Ok(())
11,367✔
369
}
11,371✔
370

×
371
fn contains_port(
11,371✔
372
    node: &NodeKind,
11,371✔
373
    direction: PortDirection,
11,371✔
374
    port: PortHandle,
11,371✔
375
) -> Result<bool, ExecutionError> {
11,371✔
376
    Ok(match node {
11,371✔
377
        NodeKind::Processor(p) => {
7,925✔
378
            if direction == PortDirection::Output {
7,925✔
379
                p.get_output_ports().iter().any(|e| e.handle == port)
3,624✔
380
            } else {
×
381
                p.get_input_ports().contains(&port)
4,301✔
382
            }
×
383
        }
×
384
        NodeKind::Sink(s) => {
1,383✔
385
            if direction == PortDirection::Output {
1,383✔
386
                false
×
387
            } else {
388
                s.get_input_ports().contains(&port)
1,383✔
389
            }
×
390
        }
×
391
        NodeKind::Source(s) => {
2,063✔
392
            if direction == PortDirection::Output {
2,063✔
393
                s.get_output_ports().iter().any(|e| e.handle == port)
2,821✔
394
            } else {
395
                false
×
396
            }
×
397
        }
×
398
    })
399
}
11,371✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc