• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.97
/dozer-cli/src/pipeline/source_builder.rs
1
use crate::pipeline::connector_source::ConnectorSourceFactory;
2
use crate::OrchestrationError;
3
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
4
use dozer_ingestion::connectors::TableInfo;
5
use dozer_sql::pipeline::builder::SchemaSQLContext;
6

7
use dozer_types::indicatif::MultiProgress;
8
use dozer_types::models::connection::Connection;
9
use dozer_types::models::source::Source;
10
use std::collections::HashMap;
11
use std::sync::Arc;
12
use tokio::runtime::Runtime;
13

14
pub struct SourceBuilder<'a> {
15
    grouped_connections: HashMap<Connection, Vec<Source>>,
16
    progress: Option<&'a MultiProgress>,
17
}
18

19
const SOURCE_PORTS_RANGE_START: u16 = 1000;
20

21
impl<'a> SourceBuilder<'a> {
22
    pub fn new(
13✔
23
        grouped_connections: HashMap<Connection, Vec<Source>>,
13✔
24
        progress: Option<&'a MultiProgress>,
13✔
25
    ) -> Self {
13✔
26
        Self {
13✔
27
            grouped_connections,
13✔
28
            progress,
13✔
29
        }
13✔
30
    }
13✔
31

32
    pub fn get_ports(&self) -> HashMap<(&str, &str), u16> {
×
33
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
×
34

×
35
        let mut ports = HashMap::new();
×
36
        for (conn, sources_group) in &self.grouped_connections {
×
37
            for source in sources_group {
×
38
                ports.insert((conn.name.as_str(), source.name.as_str()), port);
×
39
                port += 1;
×
40
            }
×
41
        }
42
        ports
×
43
    }
×
44

45
    pub async fn build_source_manager(
13✔
46
        &self,
13✔
47
        runtime: &Arc<Runtime>,
13✔
48
    ) -> Result<AppSourceManager<SchemaSQLContext>, OrchestrationError> {
13✔
49
        let mut asm = AppSourceManager::new();
13✔
50

13✔
51
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
13✔
52

53
        for (connection, sources_group) in &self.grouped_connections {
26✔
54
            let mut ports = HashMap::new();
13✔
55
            let mut table_and_ports = vec![];
13✔
56
            for source in sources_group {
35✔
57
                ports.insert(source.name.clone(), port);
22✔
58

22✔
59
                table_and_ports.push((
22✔
60
                    TableInfo {
22✔
61
                        schema: source.schema.clone(),
22✔
62
                        name: source.table_name.clone(),
22✔
63
                        column_names: source.columns.clone(),
22✔
64
                    },
22✔
65
                    port,
22✔
66
                ));
22✔
67

22✔
68
                port += 1;
22✔
69
            }
22✔
70

71
            let source_factory = ConnectorSourceFactory::new(
13✔
72
                table_and_ports,
13✔
73
                connection.clone(),
13✔
74
                runtime.clone(),
13✔
75
                self.progress.cloned(),
13✔
76
            )
13✔
77
            .await?;
48✔
78

×
79
            asm.add(
13✔
80
                Box::new(source_factory),
13✔
81
                AppSourceMappings::new(connection.name.to_string(), ports),
13✔
82
            )?;
13✔
83
        }
84

×
85
        Ok(asm)
13✔
86
    }
13✔
87
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc