• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5693561935

pending completion
5693561935

push

github

web-flow
chore: Remove `AppSourceId` which is no longer used (#1803)

* chore: Remove `AppSourceeId` which is no longer used

* chore: Remove `AppSource` and simplify source endpoint finding process

* chore: Use `Box` instead of `Arc` for the factories

* chore: Remove unused parameter in `AppPipeline::connect_nodes`

* chore: Remove an unused `Option`

443 of 443 new or added lines in 22 files covered. (100.0%)

45511 of 58843 relevant lines covered (77.34%)

39550.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/dozer-core/src/appsource.rs
1
use dozer_types::node::NodeHandle;
2

3
use crate::errors::ExecutionError;
4
use crate::errors::ExecutionError::{
5
    AmbiguousSourceIdentifier, AppSourceConnectionAlreadyExists, InvalidSourceIdentifier,
6
};
7
use crate::node::{PortHandle, SourceFactory};
8
use crate::Endpoint;
9
use std::collections::HashMap;
10

11
#[derive(Debug)]
×
12
pub struct AppSourceMappings {
13
    pub connection: String,
14
    /// From source name to output port handle.
15
    pub mappings: HashMap<String, PortHandle>,
16
}
17

18
impl AppSourceMappings {
19
    pub fn new(connection: String, mappings: HashMap<String, PortHandle>) -> Self {
397✔
20
        Self {
397✔
21
            connection,
397✔
22
            mappings,
397✔
23
        }
397✔
24
    }
397✔
25
}
26

27
#[derive(Debug)]
×
28
pub struct AppSourceManager<T> {
29
    pub(crate) sources: Vec<Box<dyn SourceFactory<T>>>,
30
    pub(crate) mappings: Vec<AppSourceMappings>,
31
}
32

33
impl<T> Default for AppSourceManager<T> {
34
    fn default() -> Self {
88✔
35
        Self {
88✔
36
            sources: vec![],
88✔
37
            mappings: vec![],
88✔
38
        }
88✔
39
    }
88✔
40
}
41

42
impl<T> AppSourceManager<T> {
43
    pub fn add(
91✔
44
        &mut self,
91✔
45
        source: Box<dyn SourceFactory<T>>,
91✔
46
        mapping: AppSourceMappings,
91✔
47
    ) -> Result<(), ExecutionError> {
91✔
48
        if self
91✔
49
            .mappings
91✔
50
            .iter()
91✔
51
            .any(|existing_mapping| existing_mapping.connection == mapping.connection)
91✔
52
        {
53
            return Err(AppSourceConnectionAlreadyExists(mapping.connection));
1✔
54
        }
90✔
55

90✔
56
        self.sources.push(source);
90✔
57
        self.mappings.push(mapping);
90✔
58
        Ok(())
90✔
59
    }
91✔
60

61
    pub fn get_endpoint(&self, source_name: &str) -> Result<Endpoint, ExecutionError> {
9✔
62
        get_endpoint_from_mappings(&self.mappings, source_name)
9✔
63
    }
9✔
64

65
    pub fn new() -> Self {
88✔
66
        Self::default()
88✔
67
    }
88✔
68
}
69

70
pub fn get_endpoint_from_mappings(
611✔
71
    mappings: &[AppSourceMappings],
611✔
72
    source_name: &str,
611✔
73
) -> Result<Endpoint, ExecutionError> {
611✔
74
    let mut found: Vec<Endpoint> = mappings
611✔
75
        .iter()
611✔
76
        .filter_map(|mapping| {
618✔
77
            mapping.mappings.get(source_name).map(|output_port| {
618✔
78
                Endpoint::new(
609✔
79
                    NodeHandle::new(None, mapping.connection.clone()),
609✔
80
                    *output_port,
609✔
81
                )
609✔
82
            })
618✔
83
        })
618✔
84
        .collect();
611✔
85

611✔
86
    match found.len() {
611✔
87
        0 => Err(InvalidSourceIdentifier(source_name.to_string())),
2✔
88
        1 => Ok(found.remove(0)),
609✔
89
        _ => Err(AmbiguousSourceIdentifier(source_name.to_string())),
×
90
    }
91
}
611✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc