• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4313132420

pending completion
4313132420

push

github

GitHub
chore: Don't run coverage on `pull_request_target` (#1116)

29185 of 39847 relevant lines covered (73.24%)

69188.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.78
/dozer-core/src/dag_impl.rs
1
use daggy::petgraph::dot;
2
use daggy::petgraph::visit::{Bfs, EdgeRef, IntoEdges};
3
use daggy::Walker;
4
use dozer_types::node::NodeHandle;
5

6
use crate::errors::ExecutionError;
7
use crate::node::{PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
8
use std::collections::{HashMap, HashSet};
9
use std::fmt::{Debug, Display};
10
use std::sync::Arc;
11

12
pub const DEFAULT_PORT_HANDLE: u16 = 0xffff_u16;
13

14
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
289✔
15
pub struct Endpoint {
16
    pub node: NodeHandle,
17
    pub port: PortHandle,
18
}
19

20
impl Endpoint {
21
    pub fn new(node: NodeHandle, port: PortHandle) -> Self {
9,578✔
22
        Self { node, port }
9,578✔
23
    }
9,578✔
24
}
25

26
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
21✔
27
pub struct Edge {
28
    pub from: Endpoint,
29
    pub to: Endpoint,
30
}
31

32
impl Edge {
33
    pub fn new(from: Endpoint, to: Endpoint) -> Self {
2,119✔
34
        Self { from, to }
2,119✔
35
    }
2,119✔
36
}
37

38
#[derive(Debug, Clone)]
3,254✔
39
/// A `SourceFactory`, `ProcessorFactory` or `SinkFactory`.
40
pub enum NodeKind<T> {
41
    Source(Arc<dyn SourceFactory<T>>),
42
    Processor(Arc<dyn ProcessorFactory<T>>),
43
    Sink(Arc<dyn SinkFactory<T>>),
44
}
45

46
#[derive(Debug, Clone)]
2,119✔
47
/// The node type of the description DAG.
48
pub struct NodeType<T> {
49
    /// The node handle, unique across the DAG.
50
    pub handle: NodeHandle,
51
    /// The node kind.
52
    pub kind: NodeKind<T>,
53
}
54

55
impl<T> Display for NodeType<T> {
56
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
57
        write!(f, "{:?}", self.handle)
×
58
    }
×
59
}
60

61
#[derive(Debug, Clone, Copy)]
×
62
/// The edge type of the description DAG.
63
pub struct EdgeType {
64
    pub from: PortHandle,
65
    pub to: PortHandle,
66
}
67

68
impl EdgeType {
69
    pub fn new(from: PortHandle, to: PortHandle) -> Self {
2,687✔
70
        Self { from, to }
2,687✔
71
    }
2,687✔
72
}
73
impl Display for EdgeType {
74
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
75
        write!(f, "{:?} -> {:?}", self.from, self.to)
×
76
    }
×
77
}
78

79
#[derive(Debug, Clone)]
×
80
pub struct Dag<T> {
81
    /// The underlying graph.
82
    graph: daggy::Dag<NodeType<T>, EdgeType>,
83
    /// Map from node handle to node index.
84
    node_lookup_table: HashMap<NodeHandle, daggy::NodeIndex>,
85
    /// All edge indexes.
86
    edge_indexes: HashSet<EdgeIndex>,
87
}
88

89
impl<T> Default for Dag<T> {
90
    fn default() -> Self {
×
91
        Self::new()
×
92
    }
×
93
}
94

95
impl<T> Display for Dag<T> {
96
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
97
        write!(f, "{}", dot::Dot::new(&self.graph))
×
98
    }
×
99
}
100

101
impl<T> Dag<T> {
102
    /// Creates an empty DAG.
103
    pub fn new() -> Self {
248✔
104
        Self {
248✔
105
            graph: daggy::Dag::new(),
248✔
106
            node_lookup_table: HashMap::new(),
248✔
107
            edge_indexes: HashSet::new(),
248✔
108
        }
248✔
109
    }
248✔
110

111
    /// Returns the underlying daggy graph.
112
    pub fn graph(&self) -> &daggy::Dag<NodeType<T>, EdgeType> {
4,690✔
113
        &self.graph
4,690✔
114
    }
4,690✔
115

116
    /// Adds a source. Panics if the `handle` exists in the `Dag`.
117
    pub fn add_source(
253✔
118
        &mut self,
253✔
119
        handle: NodeHandle,
253✔
120
        source: Arc<dyn SourceFactory<T>>,
253✔
121
    ) -> daggy::NodeIndex {
253✔
122
        self.add_node(handle, NodeKind::Source(source))
253✔
123
    }
253✔
124

125
    /// Adds a processor. Panics if the `handle` exists in the `Dag`.
126
    pub fn add_processor(
652✔
127
        &mut self,
652✔
128
        handle: NodeHandle,
652✔
129
        processor: Arc<dyn ProcessorFactory<T>>,
652✔
130
    ) -> daggy::NodeIndex {
652✔
131
        self.add_node(handle, NodeKind::Processor(processor))
652✔
132
    }
652✔
133

134
    /// Adds a sink. Panics if the `handle` exists in the `Dag`.
135
    pub fn add_sink(
240✔
136
        &mut self,
240✔
137
        handle: NodeHandle,
240✔
138
        sink: Arc<dyn SinkFactory<T>>,
240✔
139
    ) -> daggy::NodeIndex {
240✔
140
        self.add_node(handle, NodeKind::Sink(sink))
240✔
141
    }
240✔
142

143
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
144
    ///
145
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
146
    pub fn connect(&mut self, from: Endpoint, to: Endpoint) -> Result<(), ExecutionError> {
958✔
147
        let from_node_index = validate_endpoint(self, &from, PortDirection::Output)?;
958✔
148
        let to_node_index = validate_endpoint(self, &to, PortDirection::Input)?;
955✔
149
        self.connect_with_index(from_node_index, from.port, to_node_index, to.port)
954✔
150
    }
958✔
151

152
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
153
    ///
154
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
155
    pub fn connect_with_index(
969✔
156
        &mut self,
969✔
157
        from_node_index: daggy::NodeIndex,
969✔
158
        output_port: PortHandle,
969✔
159
        to_node_index: daggy::NodeIndex,
969✔
160
        input_port: PortHandle,
969✔
161
    ) -> Result<(), ExecutionError> {
969✔
162
        validate_port_with_index(self, from_node_index, output_port, PortDirection::Output)?;
969✔
163
        validate_port_with_index(self, to_node_index, input_port, PortDirection::Input)?;
969✔
164
        let edge_index = self.graph.add_edge(
969✔
165
            from_node_index,
969✔
166
            to_node_index,
969✔
167
            EdgeType::new(output_port, input_port),
969✔
168
        )?;
969✔
169

170
        if !self.edge_indexes.insert(EdgeIndex {
969✔
171
            from_node: from_node_index,
969✔
172
            output_port,
969✔
173
            to_node: to_node_index,
969✔
174
            input_port,
969✔
175
        }) {
969✔
176
            panic!("An edge {edge_index:?} has already been inserted using specified edge handle");
×
177
        }
969✔
178

969✔
179
        Ok(())
969✔
180
    }
969✔
181

182
    /// Adds another whole `Dag` to `self`. Optionally under a namespace `ns`.
183
    pub fn merge(&mut self, ns: Option<u16>, other: Dag<T>) {
4✔
184
        let (other_nodes, _) = other.graph.into_graph().into_nodes_edges();
4✔
185

4✔
186
        // Insert nodes.
4✔
187
        let mut other_node_index_to_self_node_index = vec![];
4✔
188
        for other_node in other_nodes.into_iter() {
8✔
189
            let other_node = other_node.weight;
8✔
190
            let self_node_handle =
8✔
191
                NodeHandle::new(ns.or(other_node.handle.ns), other_node.handle.id.clone());
8✔
192
            let self_node_index = self.add_node(self_node_handle.clone(), other_node.kind);
8✔
193
            other_node_index_to_self_node_index.push(self_node_index);
8✔
194
        }
8✔
195

196
        // Insert edges.
197
        for other_edge_index in other.edge_indexes.into_iter() {
4✔
198
            let self_from_node =
4✔
199
                other_node_index_to_self_node_index[other_edge_index.from_node.index()];
4✔
200
            let self_to_node =
4✔
201
                other_node_index_to_self_node_index[other_edge_index.to_node.index()];
4✔
202
            self.connect_with_index(
4✔
203
                self_from_node,
4✔
204
                other_edge_index.output_port,
4✔
205
                self_to_node,
4✔
206
                other_edge_index.input_port,
4✔
207
            )
4✔
208
            .expect("BUG in DAG");
4✔
209
        }
4✔
210
    }
4✔
211

212
    /// Returns an iterator over all node handles.
213
    pub fn node_handles(&self) -> impl Iterator<Item = &NodeHandle> {
×
214
        self.nodes().map(|node| &node.handle)
×
215
    }
×
216

217
    /// Returns an iterator over all nodes.
218
    pub fn nodes(&self) -> impl Iterator<Item = &NodeType<T>> {
×
219
        self.graph.raw_nodes().iter().map(|node| &node.weight)
×
220
    }
×
221

222
    /// Returns an iterator over source handles and sources.
223
    pub fn sources(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SourceFactory<T>>)> {
×
224
        self.nodes().flat_map(|node| {
×
225
            if let NodeKind::Source(source) = &node.kind {
×
226
                Some((&node.handle, source))
×
227
            } else {
228
                None
×
229
            }
230
        })
×
231
    }
×
232

233
    /// Returns an iterator over processor handles and processors.
234
    pub fn processors(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn ProcessorFactory<T>>)> {
×
235
        self.nodes().flat_map(|node| {
×
236
            if let NodeKind::Processor(processor) = &node.kind {
×
237
                Some((&node.handle, processor))
×
238
            } else {
239
                None
×
240
            }
241
        })
×
242
    }
×
243

244
    /// Returns an iterator over sink handles and sinks.
245
    pub fn sinks(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SinkFactory<T>>)> {
×
246
        self.nodes().flat_map(|node| {
×
247
            if let NodeKind::Sink(sink) = &node.kind {
×
248
                Some((&node.handle, sink))
×
249
            } else {
250
                None
×
251
            }
252
        })
×
253
    }
×
254

255
    /// Returns an iterator over all edge handles.
256
    pub fn edge_handles(&self) -> Vec<Edge> {
1✔
257
        let get_endpoint = |node_index: daggy::NodeIndex, port_handle| {
12✔
258
            let node = &self.graph[node_index];
12✔
259
            Endpoint {
12✔
260
                node: node.handle.clone(),
12✔
261
                port: port_handle,
12✔
262
            }
12✔
263
        };
12✔
264

265
        self.edge_indexes
1✔
266
            .iter()
1✔
267
            .map(|edge_index| {
6✔
268
                Edge::new(
6✔
269
                    get_endpoint(edge_index.from_node, edge_index.output_port),
6✔
270
                    get_endpoint(edge_index.to_node, edge_index.input_port),
6✔
271
                )
6✔
272
            })
6✔
273
            .collect()
1✔
274
    }
1✔
275

276
    /// Finds the node by its handle.
277
    pub fn node_kind_from_handle(&self, handle: &NodeHandle) -> &NodeKind<T> {
×
278
        &self.graph[self.node_index(handle)].kind
×
279
    }
×
280

281
    /// Returns an iterator over node handles that are connected to the given node handle.
282
    pub fn edges_from_handle(&self, handle: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
283
        let node_index = self.node_index(handle);
×
284
        self.graph
×
285
            .edges(node_index)
×
286
            .map(|edge| &self.graph[edge.target()].handle)
×
287
    }
×
288

289
    /// Returns an iterator over endpoints that are connected to the given endpoint.
290
    pub fn edges_from_endpoint<'a>(
×
291
        &'a self,
×
292
        node_handle: &'a NodeHandle,
×
293
        port_handle: PortHandle,
×
294
    ) -> impl Iterator<Item = (&NodeHandle, PortHandle)> {
×
295
        self.graph
×
296
            .edges(self.node_index(node_handle))
×
297
            .filter(move |edge| edge.weight().from == port_handle)
×
298
            .map(|edge| (&self.graph[edge.target()].handle, edge.weight().to))
×
299
    }
×
300

301
    /// Returns an iterator over all node handles reachable from `start` in a breadth-first search.
302
    pub fn bfs(&self, start: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
303
        let start = self.node_index(start);
×
304

×
305
        Bfs::new(self.graph.graph(), start)
×
306
            .iter(self.graph.graph())
×
307
            .map(|node_index| &self.graph[node_index].handle)
×
308
    }
×
309
}
310

311
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1,537✔
312
struct EdgeIndex {
313
    from_node: daggy::NodeIndex,
314
    output_port: PortHandle,
315
    to_node: daggy::NodeIndex,
316
    input_port: PortHandle,
317
}
318

319
impl<T> Dag<T> {
320
    fn add_node(&mut self, handle: NodeHandle, kind: NodeKind<T>) -> daggy::NodeIndex {
1,153✔
321
        let node_index = self.graph.add_node(NodeType {
1,153✔
322
            handle: handle.clone(),
1,153✔
323
            kind,
1,153✔
324
        });
1,153✔
325
        if let Some(node_index) = self.node_lookup_table.insert(handle, node_index) {
1,153✔
326
            panic!("A node {node_index:?} has already been inserted using specified node handle");
×
327
        }
1,153✔
328
        node_index
1,153✔
329
    }
1,153✔
330

331
    fn node_index(&self, node_handle: &NodeHandle) -> daggy::NodeIndex {
1,913✔
332
        *self
1,913✔
333
            .node_lookup_table
1,913✔
334
            .get(node_handle)
1,913✔
335
            .unwrap_or_else(|| panic!("Node handle {node_handle:?} not found in dag"))
1,913✔
336
    }
1,913✔
337
}
338

339
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
3,851✔
340
enum PortDirection {
341
    Input,
342
    Output,
343
}
344

345
fn validate_endpoint<T>(
1,913✔
346
    dag: &Dag<T>,
1,913✔
347
    endpoint: &Endpoint,
1,913✔
348
    direction: PortDirection,
1,913✔
349
) -> Result<daggy::NodeIndex, ExecutionError> {
1,913✔
350
    let node_index = dag.node_index(&endpoint.node);
1,913✔
351
    validate_port_with_index(dag, node_index, endpoint.port, direction)?;
1,913✔
352
    Ok(node_index)
1,909✔
353
}
1,913✔
354

355
fn validate_port_with_index<T>(
3,851✔
356
    dag: &Dag<T>,
3,851✔
357
    node_index: daggy::NodeIndex,
3,851✔
358
    port: PortHandle,
3,851✔
359
    direction: PortDirection,
3,851✔
360
) -> Result<(), ExecutionError> {
3,851✔
361
    let node = &dag.graph[node_index];
3,851✔
362
    if !contains_port(&node.kind, direction, port)? {
3,851✔
363
        return Err(ExecutionError::InvalidPortHandle(port));
4✔
364
    }
3,847✔
365
    Ok(())
3,847✔
366
}
3,851✔
367

368
fn contains_port<T>(
3,851✔
369
    node: &NodeKind<T>,
3,851✔
370
    direction: PortDirection,
3,851✔
371
    port: PortHandle,
3,851✔
372
) -> Result<bool, ExecutionError> {
3,851✔
373
    Ok(match node {
3,851✔
374
        NodeKind::Processor(p) => {
2,739✔
375
            if direction == PortDirection::Output {
2,739✔
376
                p.get_output_ports().iter().any(|e| e.handle == port)
1,290✔
377
            } else {
378
                p.get_input_ports().contains(&port)
1,449✔
379
            }
380
        }
381
        NodeKind::Sink(s) => {
475✔
382
            if direction == PortDirection::Output {
475✔
383
                false
×
384
            } else {
385
                s.get_input_ports().contains(&port)
475✔
386
            }
387
        }
388
        NodeKind::Source(s) => {
637✔
389
            if direction == PortDirection::Output {
637✔
390
                s.get_output_ports()?.iter().any(|e| e.handle == port)
919✔
391
            } else {
392
                false
×
393
            }
394
        }
395
    })
396
}
3,851✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc