• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5725710489

pending completion
5725710489

push

github

web-flow
chore: Add `SourceFactory::get_output_port_name` to simplify ui graph generation (#1812)

140 of 140 new or added lines in 13 files covered. (100.0%)

45519 of 60083 relevant lines covered (75.76%)

39458.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.59
/dozer-cli/src/pipeline/source_builder.rs
1
use crate::pipeline::connector_source::ConnectorSourceFactory;
2
use crate::OrchestrationError;
3
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
4
use dozer_ingestion::connectors::TableInfo;
5
use dozer_sql::pipeline::builder::SchemaSQLContext;
6

7
use dozer_types::indicatif::MultiProgress;
8
use dozer_types::models::connection::Connection;
9
use dozer_types::models::source::Source;
10
use std::collections::HashMap;
11
use std::sync::Arc;
12
use tokio::runtime::Runtime;
13

14
pub struct SourceBuilder<'a> {
15
    grouped_connections: HashMap<Connection, Vec<Source>>,
16
    progress: Option<&'a MultiProgress>,
17
}
18

19
const SOURCE_PORTS_RANGE_START: u16 = 1000;
20

21
impl<'a> SourceBuilder<'a> {
22
    pub fn new(
13✔
23
        grouped_connections: HashMap<Connection, Vec<Source>>,
13✔
24
        progress: Option<&'a MultiProgress>,
13✔
25
    ) -> Self {
13✔
26
        Self {
13✔
27
            grouped_connections,
13✔
28
            progress,
13✔
29
        }
13✔
30
    }
13✔
31

32
    pub fn get_ports(&self) -> HashMap<(&str, &str), u16> {
×
33
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
×
34

×
35
        let mut ports = HashMap::new();
×
36
        for (conn, sources_group) in &self.grouped_connections {
×
37
            for source in sources_group {
×
38
                ports.insert((conn.name.as_str(), source.name.as_str()), port);
×
39
                port += 1;
×
40
            }
×
41
        }
42
        ports
×
43
    }
×
44

45
    pub fn build_source_manager(
13✔
46
        &self,
13✔
47
        runtime: Arc<Runtime>,
13✔
48
    ) -> Result<AppSourceManager<SchemaSQLContext>, OrchestrationError> {
13✔
49
        let mut asm = AppSourceManager::new();
13✔
50

13✔
51
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
13✔
52

53
        for (connection, sources_group) in &self.grouped_connections {
26✔
54
            let mut ports = HashMap::new();
13✔
55
            let mut table_and_ports = vec![];
13✔
56
            for source in sources_group {
35✔
57
                ports.insert(source.name.clone(), port);
22✔
58

22✔
59
                table_and_ports.push((
22✔
60
                    TableInfo {
22✔
61
                        schema: source.schema.clone(),
22✔
62
                        name: source.table_name.clone(),
22✔
63
                        column_names: source.columns.clone(),
22✔
64
                    },
22✔
65
                    port,
22✔
66
                ));
22✔
67

22✔
68
                port += 1;
22✔
69
            }
22✔
70

71
            let source_factory = runtime.block_on(ConnectorSourceFactory::new(
13✔
72
                table_and_ports,
13✔
73
                connection.clone(),
13✔
74
                runtime.clone(),
13✔
75
                self.progress.cloned(),
13✔
76
            ))?;
13✔
77

78
            asm.add(
13✔
79
                Box::new(source_factory),
13✔
80
                AppSourceMappings::new(connection.name.to_string(), ports),
13✔
81
            )?;
13✔
82
        }
×
83

84
        Ok(asm)
13✔
85
    }
13✔
86
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc