• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5923086724

21 Aug 2023 07:05AM UTC coverage: 74.763% (-1.2%) from 75.988%
5923086724

push

github

web-flow
chore: Remove short form of `enable_progress` because it's conflicting with `dozer cloud` (#1876)

46105 of 61668 relevant lines covered (74.76%)

39792.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.64
/dozer-cli/src/pipeline/source_builder.rs
1
use crate::pipeline::connector_source::ConnectorSourceFactory;
2
use crate::shutdown::ShutdownReceiver;
3
use crate::OrchestrationError;
4
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
5
use dozer_ingestion::connectors::TableInfo;
6
use dozer_sql::pipeline::builder::SchemaSQLContext;
7

8
use dozer_types::indicatif::MultiProgress;
9
use dozer_types::models::connection::Connection;
10
use dozer_types::models::source::Source;
11
use std::collections::HashMap;
12
use std::sync::Arc;
13
use tokio::runtime::Runtime;
14

15
pub struct SourceBuilder<'a> {
16
    grouped_connections: HashMap<Connection, Vec<Source>>,
17
    progress: Option<&'a MultiProgress>,
18
}
19

20
const SOURCE_PORTS_RANGE_START: u16 = 1000;
21

22
impl<'a> SourceBuilder<'a> {
×
23
    pub fn new(
13✔
24
        grouped_connections: HashMap<Connection, Vec<Source>>,
13✔
25
        progress: Option<&'a MultiProgress>,
13✔
26
    ) -> Self {
13✔
27
        Self {
13✔
28
            grouped_connections,
13✔
29
            progress,
13✔
30
        }
13✔
31
    }
13✔
32

×
33
    pub fn get_ports(&self) -> HashMap<(&str, &str), u16> {
×
34
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
×
35

×
36
        let mut ports = HashMap::new();
×
37
        for (conn, sources_group) in &self.grouped_connections {
×
38
            for source in sources_group {
×
39
                ports.insert((conn.name.as_str(), source.name.as_str()), port);
×
40
                port += 1;
×
41
            }
×
42
        }
×
43
        ports
×
44
    }
×
45

×
46
    pub async fn build_source_manager(
13✔
47
        &self,
13✔
48
        runtime: &Arc<Runtime>,
13✔
49
        shutdown: ShutdownReceiver,
13✔
50
    ) -> Result<AppSourceManager<SchemaSQLContext>, OrchestrationError> {
13✔
51
        let mut asm = AppSourceManager::new();
13✔
52

13✔
53
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
13✔
54

×
55
        for (connection, sources_group) in &self.grouped_connections {
26✔
56
            let mut ports = HashMap::new();
13✔
57
            let mut table_and_ports = vec![];
13✔
58
            for source in sources_group {
35✔
59
                ports.insert(source.name.clone(), port);
22✔
60

22✔
61
                table_and_ports.push((
22✔
62
                    TableInfo {
22✔
63
                        schema: source.schema.clone(),
22✔
64
                        name: source.table_name.clone(),
22✔
65
                        column_names: source.columns.clone(),
22✔
66
                    },
22✔
67
                    port,
22✔
68
                ));
22✔
69

22✔
70
                port += 1;
22✔
71
            }
22✔
72

×
73
            let source_factory = ConnectorSourceFactory::new(
13✔
74
                table_and_ports,
13✔
75
                connection.clone(),
13✔
76
                runtime.clone(),
13✔
77
                self.progress.cloned(),
13✔
78
                shutdown.clone(),
13✔
79
            )
13✔
80
            .await?;
48✔
81

×
82
            asm.add(
13✔
83
                Box::new(source_factory),
13✔
84
                AppSourceMappings::new(connection.name.to_string(), ports),
13✔
85
            )?;
13✔
86
        }
×
87

88
        Ok(asm)
13✔
89
    }
13✔
90
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc