• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4384029966

pending completion
4384029966

push

github

GitHub
Prepare v0.1.11 (#1203)

28488 of 40876 relevant lines covered (69.69%)

39271.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.36
/dozer-core/src/dag_impl.rs
1
use daggy::petgraph::dot;
2
use daggy::petgraph::visit::{Bfs, EdgeRef, IntoEdges};
3
use daggy::Walker;
4
use dozer_types::node::NodeHandle;
5

6
use crate::errors::ExecutionError;
7
use crate::node::{PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
8
use std::collections::{HashMap, HashSet};
9
use std::fmt::{Debug, Display};
10
use std::sync::Arc;
11

12
pub const DEFAULT_PORT_HANDLE: u16 = 0xffff_u16;
13

14
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
313✔
15
pub struct Endpoint {
16
    pub node: NodeHandle,
17
    pub port: PortHandle,
18
}
19

20
impl Endpoint {
21
    pub fn new(node: NodeHandle, port: PortHandle) -> Self {
5,468✔
22
        Self { node, port }
5,468✔
23
    }
5,468✔
24
}
25

26
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
21✔
27
pub struct Edge {
28
    pub from: Endpoint,
29
    pub to: Endpoint,
30
}
31

32
impl Edge {
33
    pub fn new(from: Endpoint, to: Endpoint) -> Self {
1,193✔
34
        Self { from, to }
1,193✔
35
    }
1,193✔
36
}
37

38
#[derive(Debug, Clone)]
1,043✔
39
/// A `SourceFactory`, `ProcessorFactory` or `SinkFactory`.
40
pub enum NodeKind<T> {
41
    Source(Arc<dyn SourceFactory<T>>),
42
    Processor(Arc<dyn ProcessorFactory<T>>),
43
    Sink(Arc<dyn SinkFactory<T>>),
44
}
45

46
#[derive(Debug, Clone)]
1,043✔
47
/// The node type of the description DAG.
48
pub struct NodeType<T> {
49
    /// The node handle, unique across the DAG.
50
    pub handle: NodeHandle,
51
    /// The node kind.
52
    pub kind: NodeKind<T>,
53
}
54

55
impl<T> Display for NodeType<T> {
56
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
57
        write!(f, "{:?}", self.handle)
×
58
    }
×
59
}
60

61
#[derive(Debug, Clone, Copy)]
907✔
62
/// The edge type of the description DAG.
63
pub struct EdgeType {
64
    pub from: PortHandle,
65
    pub to: PortHandle,
66
}
67

68
impl EdgeType {
69
    pub fn new(from: PortHandle, to: PortHandle) -> Self {
1,558✔
70
        Self { from, to }
1,558✔
71
    }
1,558✔
72
}
73
impl Display for EdgeType {
74
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
75
        write!(f, "{:?} -> {:?}", self.from, self.to)
×
76
    }
×
77
}
78

79
#[derive(Debug, Clone)]
212✔
80
pub struct Dag<T> {
81
    /// The underlying graph.
82
    graph: daggy::Dag<NodeType<T>, EdgeType>,
83
    /// Map from node handle to node index.
84
    node_lookup_table: HashMap<NodeHandle, daggy::NodeIndex>,
85
    /// All edge indexes.
86
    edge_indexes: HashSet<EdgeIndex>,
87
}
88

89
impl<T> Default for Dag<T> {
90
    fn default() -> Self {
×
91
        Self::new()
×
92
    }
×
93
}
94

95
impl<T> Display for Dag<T> {
96
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
97
        write!(f, "{}", dot::Dot::new(&self.graph))
×
98
    }
×
99
}
100

101
impl<T> Dag<T> {
102
    /// Creates an empty DAG.
103
    pub fn new() -> Self {
252✔
104
        Self {
252✔
105
            graph: daggy::Dag::new(),
252✔
106
            node_lookup_table: HashMap::new(),
252✔
107
            edge_indexes: HashSet::new(),
252✔
108
        }
252✔
109
    }
252✔
110

111
    /// Returns the underlying daggy graph.
112
    pub fn graph(&self) -> &daggy::Dag<NodeType<T>, EdgeType> {
4,360✔
113
        &self.graph
4,360✔
114
    }
4,360✔
115

116
    /// Returns the underlying daggy graph.
117
    pub fn into_graph(self) -> daggy::Dag<NodeType<T>, EdgeType> {
444✔
118
        self.graph
444✔
119
    }
444✔
120

121
    /// Adds a source. Panics if the `handle` exists in the `Dag`.
122
    pub fn add_source(
257✔
123
        &mut self,
257✔
124
        handle: NodeHandle,
257✔
125
        source: Arc<dyn SourceFactory<T>>,
257✔
126
    ) -> daggy::NodeIndex {
257✔
127
        self.add_node(handle, NodeKind::Source(source))
257✔
128
    }
257✔
129

130
    /// Adds a processor. Panics if the `handle` exists in the `Dag`.
131
    pub fn add_processor(
663✔
132
        &mut self,
663✔
133
        handle: NodeHandle,
663✔
134
        processor: Arc<dyn ProcessorFactory<T>>,
663✔
135
    ) -> daggy::NodeIndex {
663✔
136
        self.add_node(handle, NodeKind::Processor(processor))
663✔
137
    }
663✔
138

139
    /// Adds a sink. Panics if the `handle` exists in the `Dag`.
140
    pub fn add_sink(
244✔
141
        &mut self,
244✔
142
        handle: NodeHandle,
244✔
143
        sink: Arc<dyn SinkFactory<T>>,
244✔
144
    ) -> daggy::NodeIndex {
244✔
145
        self.add_node(handle, NodeKind::Sink(sink))
244✔
146
    }
244✔
147

148
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
149
    ///
150
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
151
    pub fn connect(&mut self, from: Endpoint, to: Endpoint) -> Result<(), ExecutionError> {
989✔
152
        let from_node_index = validate_endpoint(self, &from, PortDirection::Output)?;
989✔
153
        let to_node_index = validate_endpoint(self, &to, PortDirection::Input)?;
986✔
154
        self.connect_with_index(from_node_index, from.port, to_node_index, to.port)
985✔
155
    }
989✔
156

157
    /// Adds an edge. Panics if there's already an edge from `from` to `to`.
158
    ///
159
    /// Returns an error if any of the port cannot be found or the edge would create a cycle.
160
    pub fn connect_with_index(
1,000✔
161
        &mut self,
1,000✔
162
        from_node_index: daggy::NodeIndex,
1,000✔
163
        output_port: PortHandle,
1,000✔
164
        to_node_index: daggy::NodeIndex,
1,000✔
165
        input_port: PortHandle,
1,000✔
166
    ) -> Result<(), ExecutionError> {
1,000✔
167
        validate_port_with_index(self, from_node_index, output_port, PortDirection::Output)?;
1,000✔
168
        validate_port_with_index(self, to_node_index, input_port, PortDirection::Input)?;
1,000✔
169
        let edge_index = self.graph.add_edge(
1,000✔
170
            from_node_index,
1,000✔
171
            to_node_index,
1,000✔
172
            EdgeType::new(output_port, input_port),
1,000✔
173
        )?;
1,000✔
174

175
        if !self.edge_indexes.insert(EdgeIndex {
1,000✔
176
            from_node: from_node_index,
1,000✔
177
            output_port,
1,000✔
178
            to_node: to_node_index,
1,000✔
179
            input_port,
1,000✔
180
        }) {
1,000✔
181
            panic!("An edge {edge_index:?} has already been inserted using specified edge handle");
1✔
182
        }
999✔
183

999✔
184
        Ok(())
999✔
185
    }
999✔
186

187
    /// Adds another whole `Dag` to `self`. Optionally under a namespace `ns`.
188
    pub fn merge(&mut self, ns: Option<u16>, other: Dag<T>) {
4✔
189
        let (other_nodes, _) = other.graph.into_graph().into_nodes_edges();
4✔
190

4✔
191
        // Insert nodes.
4✔
192
        let mut other_node_index_to_self_node_index = vec![];
4✔
193
        for other_node in other_nodes.into_iter() {
8✔
194
            let other_node = other_node.weight;
8✔
195
            let self_node_handle =
8✔
196
                NodeHandle::new(ns.or(other_node.handle.ns), other_node.handle.id.clone());
8✔
197
            let self_node_index = self.add_node(self_node_handle.clone(), other_node.kind);
8✔
198
            other_node_index_to_self_node_index.push(self_node_index);
8✔
199
        }
8✔
200

201
        // Insert edges.
202
        for other_edge_index in other.edge_indexes.into_iter() {
4✔
203
            let self_from_node =
4✔
204
                other_node_index_to_self_node_index[other_edge_index.from_node.index()];
4✔
205
            let self_to_node =
4✔
206
                other_node_index_to_self_node_index[other_edge_index.to_node.index()];
4✔
207
            self.connect_with_index(
4✔
208
                self_from_node,
4✔
209
                other_edge_index.output_port,
4✔
210
                self_to_node,
4✔
211
                other_edge_index.input_port,
4✔
212
            )
4✔
213
            .expect("BUG in DAG");
4✔
214
        }
4✔
215
    }
4✔
216

217
    /// Returns an iterator over all node handles.
218
    pub fn node_handles(&self) -> impl Iterator<Item = &NodeHandle> {
×
219
        self.nodes().map(|node| &node.handle)
×
220
    }
×
221

222
    /// Returns an iterator over all nodes.
223
    pub fn nodes(&self) -> impl Iterator<Item = &NodeType<T>> {
×
224
        self.graph.raw_nodes().iter().map(|node| &node.weight)
×
225
    }
×
226

227
    /// Returns an iterator over source handles and sources.
228
    pub fn sources(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SourceFactory<T>>)> {
×
229
        self.nodes().flat_map(|node| {
×
230
            if let NodeKind::Source(source) = &node.kind {
×
231
                Some((&node.handle, source))
×
232
            } else {
233
                None
×
234
            }
235
        })
×
236
    }
×
237

238
    /// Returns an iterator over processor handles and processors.
239
    pub fn processors(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn ProcessorFactory<T>>)> {
×
240
        self.nodes().flat_map(|node| {
×
241
            if let NodeKind::Processor(processor) = &node.kind {
×
242
                Some((&node.handle, processor))
×
243
            } else {
244
                None
×
245
            }
246
        })
×
247
    }
×
248

249
    /// Returns an iterator over sink handles and sinks.
250
    pub fn sinks(&self) -> impl Iterator<Item = (&NodeHandle, &Arc<dyn SinkFactory<T>>)> {
×
251
        self.nodes().flat_map(|node| {
×
252
            if let NodeKind::Sink(sink) = &node.kind {
×
253
                Some((&node.handle, sink))
×
254
            } else {
255
                None
×
256
            }
257
        })
×
258
    }
×
259

260
    /// Returns an iterator over all edge handles.
261
    pub fn edge_handles(&self) -> Vec<Edge> {
1✔
262
        let get_endpoint = |node_index: daggy::NodeIndex, port_handle| {
12✔
263
            let node = &self.graph[node_index];
12✔
264
            Endpoint {
12✔
265
                node: node.handle.clone(),
12✔
266
                port: port_handle,
12✔
267
            }
12✔
268
        };
12✔
269

270
        self.edge_indexes
1✔
271
            .iter()
1✔
272
            .map(|edge_index| {
6✔
273
                Edge::new(
6✔
274
                    get_endpoint(edge_index.from_node, edge_index.output_port),
6✔
275
                    get_endpoint(edge_index.to_node, edge_index.input_port),
6✔
276
                )
6✔
277
            })
6✔
278
            .collect()
1✔
279
    }
1✔
280

281
    /// Finds the node by its handle.
282
    pub fn node_kind_from_handle(&self, handle: &NodeHandle) -> &NodeKind<T> {
×
283
        &self.graph[self.node_index(handle)].kind
×
284
    }
×
285

286
    /// Returns an iterator over node handles that are connected to the given node handle.
287
    pub fn edges_from_handle(&self, handle: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
288
        let node_index = self.node_index(handle);
×
289
        self.graph
×
290
            .edges(node_index)
×
291
            .map(|edge| &self.graph[edge.target()].handle)
×
292
    }
×
293

294
    /// Returns an iterator over endpoints that are connected to the given endpoint.
295
    pub fn edges_from_endpoint<'a>(
×
296
        &'a self,
×
297
        node_handle: &'a NodeHandle,
×
298
        port_handle: PortHandle,
×
299
    ) -> impl Iterator<Item = (&NodeHandle, PortHandle)> {
×
300
        self.graph
×
301
            .edges(self.node_index(node_handle))
×
302
            .filter(move |edge| edge.weight().from == port_handle)
×
303
            .map(|edge| (&self.graph[edge.target()].handle, edge.weight().to))
×
304
    }
×
305

306
    /// Returns an iterator over all node handles reachable from `start` in a breadth-first search.
307
    pub fn bfs(&self, start: &NodeHandle) -> impl Iterator<Item = &NodeHandle> {
×
308
        let start = self.node_index(start);
×
309

×
310
        Bfs::new(self.graph.graph(), start)
×
311
            .iter(self.graph.graph())
×
312
            .map(|node_index| &self.graph[node_index].handle)
×
313
    }
×
314
}
315

316
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1,591✔
317
struct EdgeIndex {
318
    from_node: daggy::NodeIndex,
319
    output_port: PortHandle,
320
    to_node: daggy::NodeIndex,
321
    input_port: PortHandle,
322
}
323

324
impl<T> Dag<T> {
325
    fn add_node(&mut self, handle: NodeHandle, kind: NodeKind<T>) -> daggy::NodeIndex {
1,171✔
326
        let node_index = self.graph.add_node(NodeType {
1,171✔
327
            handle: handle.clone(),
1,171✔
328
            kind,
1,171✔
329
        });
1,171✔
330
        if let Some(node_index) = self.node_lookup_table.insert(handle, node_index) {
1,171✔
331
            panic!("A node {node_index:?} has already been inserted using specified node handle");
×
332
        }
1,171✔
333
        node_index
1,171✔
334
    }
1,171✔
335

336
    fn node_index(&self, node_handle: &NodeHandle) -> daggy::NodeIndex {
1,975✔
337
        *self
1,975✔
338
            .node_lookup_table
1,975✔
339
            .get(node_handle)
1,975✔
340
            .unwrap_or_else(|| panic!("Node handle {node_handle:?} not found in dag"))
1,975✔
341
    }
1,975✔
342
}
343

344
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
3,975✔
345
enum PortDirection {
346
    Input,
347
    Output,
348
}
349

350
fn validate_endpoint<T>(
1,975✔
351
    dag: &Dag<T>,
1,975✔
352
    endpoint: &Endpoint,
1,975✔
353
    direction: PortDirection,
1,975✔
354
) -> Result<daggy::NodeIndex, ExecutionError> {
1,975✔
355
    let node_index = dag.node_index(&endpoint.node);
1,975✔
356
    validate_port_with_index(dag, node_index, endpoint.port, direction)?;
1,975✔
357
    Ok(node_index)
1,971✔
358
}
1,975✔
359

360
fn validate_port_with_index<T>(
3,975✔
361
    dag: &Dag<T>,
3,975✔
362
    node_index: daggy::NodeIndex,
3,975✔
363
    port: PortHandle,
3,975✔
364
    direction: PortDirection,
3,975✔
365
) -> Result<(), ExecutionError> {
3,975✔
366
    let node = &dag.graph[node_index];
3,975✔
367
    if !contains_port(&node.kind, direction, port)? {
3,975✔
368
        return Err(ExecutionError::InvalidPortHandle(port));
4✔
369
    }
3,971✔
370
    Ok(())
3,971✔
371
}
3,975✔
372

373
fn contains_port<T>(
3,975✔
374
    node: &NodeKind<T>,
3,975✔
375
    direction: PortDirection,
3,975✔
376
    port: PortHandle,
3,975✔
377
) -> Result<bool, ExecutionError> {
3,975✔
378
    Ok(match node {
3,975✔
379
        NodeKind::Processor(p) => {
2,815✔
380
            if direction == PortDirection::Output {
2,815✔
381
                p.get_output_ports().iter().any(|e| e.handle == port)
1,312✔
382
            } else {
383
                p.get_input_ports().contains(&port)
1,503✔
384
            }
385
        }
386
        NodeKind::Sink(s) => {
483✔
387
            if direction == PortDirection::Output {
483✔
388
                false
×
389
            } else {
390
                s.get_input_ports().contains(&port)
483✔
391
            }
392
        }
393
        NodeKind::Source(s) => {
677✔
394
            if direction == PortDirection::Output {
677✔
395
                s.get_output_ports().iter().any(|e| e.handle == port)
887✔
396
            } else {
397
                false
×
398
            }
399
        }
400
    })
401
}
3,975✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc