• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.03
/dozer-orchestrator/src/pipeline/source_builder.rs
1
use crate::pipeline::connector_source::ConnectorSourceFactory;
2
use crate::OrchestrationError;
3
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
4
use dozer_ingestion::connectors::TableInfo;
5
use dozer_ingestion::ingestion::{IngestionConfig, Ingestor};
6
use dozer_sql::pipeline::builder::SchemaSQLContext;
7
use dozer_types::models::source::Source;
8
use dozer_types::parking_lot::RwLock;
9
use std::collections::HashMap;
10
use std::sync::atomic::AtomicBool;
11
use std::sync::Arc;
12

13
pub struct SourceBuilder {
14
    used_sources: Vec<String>,
15
    grouped_connections: HashMap<String, Vec<Source>>,
16
}
17

18
pub type IngestorVec = Vec<Arc<RwLock<Ingestor>>>;
×
19

×
20
const SOURCE_PORTS_RANGE_START: u16 = 1000;
×
21

×
22
impl SourceBuilder {
×
23
    pub fn new(
2✔
24
        used_sources: Vec<String>,
2✔
25
        grouped_connections: HashMap<String, Vec<Source>>,
2✔
26
    ) -> Self {
2✔
27
        Self {
2✔
28
            used_sources,
2✔
29
            grouped_connections,
2✔
30
        }
2✔
31
    }
2✔
32

×
33
    pub fn get_ports(&self) -> HashMap<(String, String), u16> {
×
34
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
×
35

×
36
        let mut ports = HashMap::new();
×
37
        for (conn, sources_group) in self.grouped_connections.clone() {
×
38
            for source in &sources_group {
×
39
                if self.used_sources.contains(&source.name) {
×
40
                    ports.insert((conn.clone(), source.name.clone()), port);
×
41
                    port += 1;
×
42
                }
×
43
            }
×
44
        }
×
45
        ports
×
46
    }
×
47

×
48
    pub fn build_source_manager(
2✔
49
        &self,
2✔
50
        running: Arc<AtomicBool>,
2✔
51
    ) -> Result<(AppSourceManager<SchemaSQLContext>, IngestorVec), OrchestrationError> {
2✔
52
        let mut asm = AppSourceManager::new();
2✔
53
        let mut ingestors = vec![];
2✔
54

2✔
55
        let mut port: u16 = SOURCE_PORTS_RANGE_START;
2✔
56

×
57
        for (conn, sources_group) in self.grouped_connections.clone() {
4✔
58
            let first_source = sources_group.get(0).unwrap();
4✔
59

×
60
            if let Some(connection) = &first_source.connection {
4✔
61
                let mut ports = HashMap::new();
4✔
62
                let mut tables = vec![];
4✔
63
                for source in &sources_group {
12✔
64
                    if self.used_sources.contains(&source.name) {
8✔
65
                        ports.insert(source.name.clone(), port);
5✔
66

5✔
67
                        tables.push(TableInfo {
5✔
68
                            name: source.name.clone(),
5✔
69
                            table_name: source.table_name.clone(),
5✔
70
                            id: port as u32,
5✔
71
                            columns: Some(source.columns.clone()),
5✔
72
                        });
5✔
73

5✔
74
                        port += 1;
5✔
75
                    }
5✔
76
                }
×
77

×
78
                let (ingestor, iterator) = Ingestor::initialize_channel(IngestionConfig::default());
4✔
79

4✔
80
                let source_factory = ConnectorSourceFactory::new(
4✔
81
                    Arc::clone(&ingestor),
4✔
82
                    Arc::clone(&iterator),
4✔
83
                    ports.clone(),
4✔
84
                    tables,
4✔
85
                    connection.clone(),
4✔
86
                    running.clone(),
4✔
87
                );
4✔
88

4✔
89
                asm.add(AppSource::new(
4✔
90
                    conn.clone(),
4✔
91
                    Arc::new(source_factory),
4✔
92
                    ports,
4✔
93
                ))?;
4✔
94

×
95
                ingestors.push(ingestor);
4✔
96
            }
×
97
        }
×
98

×
99
        Ok((asm, ingestors))
2✔
100
    }
2✔
101

×
102
    pub fn group_connections(sources: Vec<Source>) -> HashMap<String, Vec<Source>> {
2✔
103
        sources
2✔
104
            .into_iter()
2✔
105
            .fold(HashMap::<String, Vec<Source>>::new(), |mut acc, a| {
2✔
106
                if let Some(conn) = a.connection.clone() {
8✔
107
                    acc.entry(conn.name).or_default().push(a);
8✔
108
                }
8✔
109

×
110
                acc
8✔
111
            })
8✔
112
    }
2✔
113
}
×
114

×
115
#[cfg(test)]
×
116
mod tests {
×
117
    use crate::pipeline::source_builder::SourceBuilder;
×
118
    use dozer_types::models::app_config::Config;
×
119
    use std::sync::atomic::AtomicBool;
×
120
    use std::sync::Arc;
×
121

×
122
    use dozer_core::dag::appsource::{AppSourceId, AppSourceMappings};
×
123
    use dozer_sql::pipeline::builder::SchemaSQLContext;
×
124
    use dozer_types::models::connection::{
×
125
        Authentication, Connection, DBType, EventsAuthentication,
×
126
    };
×
127
    use dozer_types::models::source::Source;
×
128

×
129
    fn get_default_config() -> Config {
2✔
130
        let events1_conn = Connection {
2✔
131
            authentication: Some(Authentication::Events(EventsAuthentication {})),
2✔
132
            id: None,
2✔
133
            app_id: None,
2✔
134
            db_type: DBType::Postgres.into(),
2✔
135
            name: "pg_conn".to_string(),
2✔
136
        };
2✔
137

2✔
138
        let events2_conn = Connection {
2✔
139
            authentication: Some(Authentication::Events(EventsAuthentication {})),
2✔
140
            id: None,
2✔
141
            app_id: None,
2✔
142
            db_type: DBType::Snowflake.into(),
2✔
143
            name: "snow".to_string(),
2✔
144
        };
2✔
145

2✔
146
        Config {
2✔
147
            id: None,
2✔
148
            app_name: "multi".to_string(),
2✔
149
            api: Default::default(),
2✔
150
            flags: Default::default(),
2✔
151
            connections: vec![events1_conn.clone(), events2_conn.clone()],
2✔
152
            sources: vec![
2✔
153
                Source {
2✔
154
                    id: None,
2✔
155
                    name: "customers".to_string(),
2✔
156
                    table_name: "customers".to_string(),
2✔
157
                    columns: vec!["id".to_string()],
2✔
158
                    connection: Some(events1_conn.clone()),
2✔
159
                    refresh_config: None,
2✔
160
                    app_id: None,
2✔
161
                },
2✔
162
                Source {
2✔
163
                    id: None,
2✔
164
                    name: "addresses".to_string(),
2✔
165
                    table_name: "addresses".to_string(),
2✔
166
                    columns: vec!["id".to_string()],
2✔
167
                    connection: Some(events1_conn),
2✔
168
                    refresh_config: None,
2✔
169
                    app_id: None,
2✔
170
                },
2✔
171
                Source {
2✔
172
                    id: None,
2✔
173
                    name: "prices".to_string(),
2✔
174
                    table_name: "prices".to_string(),
2✔
175
                    columns: vec!["id".to_string()],
2✔
176
                    connection: Some(events2_conn.clone()),
2✔
177
                    refresh_config: None,
2✔
178
                    app_id: None,
2✔
179
                },
2✔
180
                Source {
2✔
181
                    id: None,
2✔
182
                    name: "prices_history".to_string(),
2✔
183
                    table_name: "prices_history".to_string(),
2✔
184
                    columns: vec!["id".to_string()],
2✔
185
                    connection: Some(events2_conn),
2✔
186
                    refresh_config: None,
2✔
187
                    app_id: None,
2✔
188
                },
2✔
189
            ],
2✔
190
            endpoints: vec![],
2✔
191
            sql: None,
2✔
192
            home_dir: "test".to_string(),
2✔
193
        }
2✔
194
    }
2✔
195

×
196
    #[test]
1✔
197
    fn load_multi_sources() {
1✔
198
        let config = get_default_config();
1✔
199

1✔
200
        let tables = config
1✔
201
            .sources
1✔
202
            .iter()
1✔
203
            .map(|s| s.table_name.clone())
4✔
204
            .collect();
1✔
205

1✔
206
        let source_builder = SourceBuilder::new(
1✔
207
            tables,
1✔
208
            SourceBuilder::group_connections(config.sources.clone()),
1✔
209
        );
1✔
210
        let asm = source_builder
1✔
211
            .build_source_manager(Arc::new(AtomicBool::new(true)))
1✔
212
            .unwrap()
1✔
213
            .0;
1✔
214

1✔
215
        let conn_name_1 = config.connections.get(0).unwrap().name.clone();
1✔
216
        let conn_name_2 = config.connections.get(1).unwrap().name.clone();
1✔
217
        let pg_source_mapping: Vec<AppSourceMappings<SchemaSQLContext>> = asm
1✔
218
            .get(vec![
1✔
219
                AppSourceId::new(
1✔
220
                    config.sources.get(0).unwrap().table_name.clone(),
1✔
221
                    Some(conn_name_1),
1✔
222
                ),
1✔
223
                AppSourceId::new(
1✔
224
                    config.sources.get(2).unwrap().table_name.clone(),
1✔
225
                    Some(conn_name_2),
1✔
226
                ),
1✔
227
            ])
1✔
228
            .unwrap();
1✔
229

1✔
230
        assert_eq!(1, pg_source_mapping.get(0).unwrap().mappings.len());
1✔
231
    }
1✔
232

×
233
    #[test]
1✔
234
    fn load_only_used_sources() {
1✔
235
        let config = get_default_config();
1✔
236

1✔
237
        let only_used_table_name = vec![config.sources.get(0).unwrap().table_name.clone()];
1✔
238
        let conn_name = config.connections.get(0).unwrap().name.clone();
1✔
239
        let source_builder = SourceBuilder::new(
1✔
240
            only_used_table_name,
1✔
241
            SourceBuilder::group_connections(config.sources.clone()),
1✔
242
        );
1✔
243
        let asm = source_builder
1✔
244
            .build_source_manager(Arc::new(AtomicBool::new(true)))
1✔
245
            .unwrap()
1✔
246
            .0;
1✔
247

1✔
248
        let pg_source_mapping: Vec<AppSourceMappings<SchemaSQLContext>> = asm
1✔
249
            .get(vec![AppSourceId::new(
1✔
250
                config.sources.get(0).unwrap().table_name.clone(),
1✔
251
                Some(conn_name.clone()),
1✔
252
            )])
1✔
253
            .unwrap();
1✔
254

1✔
255
        assert_eq!(1, pg_source_mapping.get(0).unwrap().mappings.len());
1✔
256

×
257
        assert!(asm
1✔
258
            .get(vec![AppSourceId::new(
1✔
259
                config.sources.get(0).unwrap().table_name.clone(),
1✔
260
                Some(conn_name),
1✔
261
            ),])
1✔
262
            .is_ok())
1✔
263
    }
1✔
264
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc