• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.36
/dozer-core/src/dag/tests/app.rs
1
use crate::chk;
2
use crate::dag::app::{App, AppPipeline, PipelineEntryPoint};
3
use crate::dag::appsource::{AppSource, AppSourceId, AppSourceManager};
4
use crate::dag::errors::ExecutionError;
5
use crate::dag::executor::{DagExecutor, ExecutorOptions};
6
use crate::dag::node::{NodeHandle, OutputPortDef, PortHandle, Source, SourceFactory};
7
use crate::dag::tests::dag_base_run::{
8
    NoopJoinProcessorFactory, NOOP_JOIN_LEFT_INPUT_PORT, NOOP_JOIN_RIGHT_INPUT_PORT,
9
};
10
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
11
use crate::dag::tests::sources::{
12
    DualPortGeneratorSourceFactory, GeneratorSourceFactory,
13
    DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
14
    GENERATOR_SOURCE_OUTPUT_PORT,
15
};
16
use crate::dag::{Edge, Endpoint, DEFAULT_PORT_HANDLE};
17
use dozer_types::types::Schema;
18

19
use std::collections::HashMap;
20
use std::sync::atomic::AtomicBool;
21
use std::sync::Arc;
22

23
use tempdir::TempDir;
24

25
#[derive(Debug, Clone)]
452✔
26
pub(crate) struct NoneContext {}
27

28
#[derive(Debug)]
×
29
struct NoneSourceFactory {}
30
impl SourceFactory<NoneContext> for NoneSourceFactory {
31
    fn get_output_schema(
×
32
        &self,
×
33
        _port: &PortHandle,
×
34
    ) -> Result<(Schema, NoneContext), ExecutionError> {
×
35
        todo!()
×
36
    }
×
37

38
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
39
        todo!()
×
40
    }
×
41

42
    fn prepare(
×
43
        &self,
×
44
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
45
    ) -> Result<(), ExecutionError> {
×
46
        Ok(())
×
47
    }
×
48

49
    fn build(
×
50
        &self,
×
51
        _output_schemas: HashMap<PortHandle, Schema>,
×
52
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
53
        todo!()
×
54
    }
×
55
}
56

57
#[test]
1✔
58
fn test_apps_sorce_smanager_connection_exists() {
1✔
59
    let mut asm = AppSourceManager::new();
1✔
60
    let app_src = AppSource::new(
1✔
61
        "conn1".to_string(),
1✔
62
        Arc::new(NoneSourceFactory {}),
1✔
63
        vec![("table1".to_string(), 1_u16)].into_iter().collect(),
1✔
64
    );
1✔
65
    let _r = asm.add(app_src);
1✔
66
    let app_src = AppSource::new(
1✔
67
        "conn1".to_string(),
1✔
68
        Arc::new(NoneSourceFactory {}),
1✔
69
        vec![("table2".to_string(), 1_u16)].into_iter().collect(),
1✔
70
    );
1✔
71
    let r = asm.add(app_src);
1✔
72
    assert!(r.is_err());
1✔
73
}
1✔
74

75
#[test]
1✔
76
fn test_apps_sorce_smanager_lookup() {
1✔
77
    let mut asm = AppSourceManager::new();
1✔
78
    let app_src = AppSource::new(
1✔
79
        "conn1".to_string(),
1✔
80
        Arc::new(NoneSourceFactory {}),
1✔
81
        vec![("table1".to_string(), 1_u16)].into_iter().collect(),
1✔
82
    );
1✔
83
    asm.add(app_src).unwrap();
1✔
84

1✔
85
    let r = asm
1✔
86
        .get(vec![AppSourceId::new("table1".to_string(), None)])
1✔
87
        .unwrap();
1✔
88
    assert_eq!(r[0].source.connection, "conn1");
1✔
89
    assert_eq!(
1✔
90
        r[0].mappings
1✔
91
            .get(&AppSourceId::new("table1".to_string(), None))
1✔
92
            .unwrap(),
1✔
93
        &1_u16
1✔
94
    );
1✔
95

96
    let r = asm.get(vec![AppSourceId::new(
1✔
97
        "table1".to_string(),
1✔
98
        Some("No connection".to_string()),
1✔
99
    )]);
1✔
100
    assert!(r.is_err());
1✔
101

102
    let r = asm
1✔
103
        .get(vec![AppSourceId::new(
1✔
104
            "table1".to_string(),
1✔
105
            Some("conn1".to_string()),
1✔
106
        )])
1✔
107
        .unwrap();
1✔
108
    assert_eq!(r[0].source.connection, "conn1");
1✔
109
    assert_eq!(
1✔
110
        r[0].mappings
1✔
111
            .get(&AppSourceId::new(
1✔
112
                "table1".to_string(),
1✔
113
                Some("conn1".to_string())
1✔
114
            ))
1✔
115
            .unwrap(),
1✔
116
        &1_u16
1✔
117
    );
1✔
118

119
    // Insert same table name
120
    let app_src = AppSource::new(
1✔
121
        "conn2".to_string(),
1✔
122
        Arc::new(NoneSourceFactory {}),
1✔
123
        vec![("table1".to_string(), 2_u16)].into_iter().collect(),
1✔
124
    );
1✔
125
    asm.add(app_src).unwrap();
1✔
126

1✔
127
    let r = asm.get(vec![AppSourceId::new("table1".to_string(), None)]);
1✔
128
    assert!(r.is_err());
1✔
129

130
    let r = asm
1✔
131
        .get(vec![
1✔
132
            AppSourceId::new("table1".to_string(), Some("conn1".to_string())),
1✔
133
            AppSourceId::new("table1".to_string(), Some("conn2".to_string())),
1✔
134
        ])
1✔
135
        .unwrap();
1✔
136

1✔
137
    let conn1 = r.iter().find(|e| e.source.connection == "conn1");
2✔
138
    assert!(conn1.is_some());
1✔
139
    let conn2 = r.iter().find(|e| e.source.connection == "conn2");
1✔
140
    assert!(conn2.is_some());
1✔
141

142
    assert_eq!(
1✔
143
        conn1
1✔
144
            .unwrap()
1✔
145
            .mappings
1✔
146
            .get(&AppSourceId::new(
1✔
147
                "table1".to_string(),
1✔
148
                Some("conn1".to_string())
1✔
149
            ))
1✔
150
            .unwrap(),
1✔
151
        &1_u16
1✔
152
    );
1✔
153
    assert_eq!(
1✔
154
        conn2
1✔
155
            .unwrap()
1✔
156
            .mappings
1✔
157
            .get(&AppSourceId::new(
1✔
158
                "table1".to_string(),
1✔
159
                Some("conn2".to_string())
1✔
160
            ))
1✔
161
            .unwrap(),
1✔
162
        &2_u16
1✔
163
    );
1✔
164
}
1✔
165

166
#[test]
1✔
167
fn test_apps_source_manager_lookup_multiple_ports() {
1✔
168
    let mut asm = AppSourceManager::new();
1✔
169
    let app_src = AppSource::new(
1✔
170
        "conn1".to_string(),
1✔
171
        Arc::new(NoneSourceFactory {}),
1✔
172
        vec![("table1".to_string(), 1_u16), ("table2".to_string(), 2_u16)]
1✔
173
            .into_iter()
1✔
174
            .collect(),
1✔
175
    );
1✔
176
    asm.add(app_src).unwrap();
1✔
177

1✔
178
    let _r = asm.get(vec![
1✔
179
        AppSourceId::new("table1".to_string(), None),
1✔
180
        AppSourceId::new("table2".to_string(), None),
1✔
181
    ]);
1✔
182

1✔
183
    let r = asm
1✔
184
        .get(vec![
1✔
185
            AppSourceId::new("table1".to_string(), None),
1✔
186
            AppSourceId::new("table2".to_string(), None),
1✔
187
        ])
1✔
188
        .unwrap();
1✔
189

1✔
190
    assert_eq!(r[0].source.connection, "conn1");
1✔
191
    assert_eq!(
1✔
192
        r[0].mappings
1✔
193
            .get(&AppSourceId::new("table1".to_string(), None))
1✔
194
            .unwrap(),
1✔
195
        &1_u16
1✔
196
    );
1✔
197
    assert_eq!(
1✔
198
        r[0].mappings
1✔
199
            .get(&AppSourceId::new("table2".to_string(), None))
1✔
200
            .unwrap(),
1✔
201
        &2_u16
1✔
202
    );
1✔
203
}
1✔
204

205
#[test]
1✔
206
fn test_app_dag() {
1✔
207
    let latch = Arc::new(AtomicBool::new(true));
1✔
208

1✔
209
    let mut asm = AppSourceManager::new();
1✔
210
    asm.add(AppSource::new(
1✔
211
        "postgres".to_string(),
1✔
212
        Arc::new(DualPortGeneratorSourceFactory::new(
1✔
213
            10_000,
1✔
214
            latch.clone(),
1✔
215
            true,
1✔
216
        )),
1✔
217
        vec![
1✔
218
            (
1✔
219
                "users".to_string(),
1✔
220
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
1✔
221
            ),
1✔
222
            (
1✔
223
                "transactions".to_string(),
1✔
224
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
1✔
225
            ),
1✔
226
        ]
1✔
227
        .into_iter()
1✔
228
        .collect(),
1✔
229
    ))
1✔
230
    .unwrap();
1✔
231

1✔
232
    asm.add(AppSource::new(
1✔
233
        "snowflake".to_string(),
1✔
234
        Arc::new(GeneratorSourceFactory::new(10_000, latch.clone(), true)),
1✔
235
        vec![("users".to_string(), GENERATOR_SOURCE_OUTPUT_PORT)]
1✔
236
            .into_iter()
1✔
237
            .collect(),
1✔
238
    ))
1✔
239
    .unwrap();
1✔
240

1✔
241
    let mut app = App::new(asm);
1✔
242

1✔
243
    let mut p1 = AppPipeline::new();
1✔
244
    p1.add_processor(
1✔
245
        Arc::new(NoopJoinProcessorFactory {}),
1✔
246
        "join",
1✔
247
        vec![
1✔
248
            PipelineEntryPoint::new(
1✔
249
                AppSourceId::new("users".to_string(), Some("postgres".to_string())),
1✔
250
                NOOP_JOIN_LEFT_INPUT_PORT,
1✔
251
            ),
1✔
252
            PipelineEntryPoint::new(
1✔
253
                AppSourceId::new("transactions".to_string(), None),
1✔
254
                NOOP_JOIN_RIGHT_INPUT_PORT,
1✔
255
            ),
1✔
256
        ],
1✔
257
    );
1✔
258
    p1.add_sink(
1✔
259
        Arc::new(CountingSinkFactory::new(20_000, latch.clone())),
1✔
260
        "sink",
1✔
261
    );
1✔
262
    p1.connect_nodes("join", None, "sink", Some(COUNTING_SINK_INPUT_PORT), true)
1✔
263
        .unwrap();
1✔
264

1✔
265
    app.add_pipeline(p1);
1✔
266

1✔
267
    let mut p2 = AppPipeline::new();
1✔
268
    p2.add_processor(
1✔
269
        Arc::new(NoopJoinProcessorFactory {}),
1✔
270
        "join",
1✔
271
        vec![
1✔
272
            PipelineEntryPoint::new(
1✔
273
                AppSourceId::new("users".to_string(), Some("snowflake".to_string())),
1✔
274
                NOOP_JOIN_LEFT_INPUT_PORT,
1✔
275
            ),
1✔
276
            PipelineEntryPoint::new(
1✔
277
                AppSourceId::new("transactions".to_string(), None),
1✔
278
                NOOP_JOIN_RIGHT_INPUT_PORT,
1✔
279
            ),
1✔
280
        ],
1✔
281
    );
1✔
282
    p2.add_sink(Arc::new(CountingSinkFactory::new(20_000, latch)), "sink");
1✔
283
    p2.connect_nodes("join", None, "sink", Some(COUNTING_SINK_INPUT_PORT), true)
1✔
284
        .unwrap();
1✔
285

1✔
286
    app.add_pipeline(p2);
1✔
287

1✔
288
    let dag = app.get_dag().unwrap();
1✔
289

1✔
290
    assert!(dag.edge_handles().any(|e| *e
1✔
291
        == Edge::new(
1✔
292
            Endpoint::new(
1✔
293
                NodeHandle::new(None, "postgres".to_string()),
1✔
294
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1
1✔
295
            ),
1✔
296
            Endpoint::new(
1✔
297
                NodeHandle::new(Some(1), "join".to_string()),
1✔
298
                NOOP_JOIN_LEFT_INPUT_PORT
1✔
299
            )
1✔
300
        )));
1✔
301

302
    assert!(dag.edge_handles().any(|e| *e
5✔
303
        == Edge::new(
5✔
304
            Endpoint::new(
5✔
305
                NodeHandle::new(None, "postgres".to_string()),
5✔
306
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2
5✔
307
            ),
5✔
308
            Endpoint::new(
5✔
309
                NodeHandle::new(Some(1), "join".to_string()),
5✔
310
                NOOP_JOIN_RIGHT_INPUT_PORT
5✔
311
            )
5✔
312
        )));
5✔
313

314
    assert!(dag.edge_handles().any(|e| *e
2✔
315
        == Edge::new(
2✔
316
            Endpoint::new(
2✔
317
                NodeHandle::new(None, "snowflake".to_string()),
2✔
318
                GENERATOR_SOURCE_OUTPUT_PORT
2✔
319
            ),
2✔
320
            Endpoint::new(
2✔
321
                NodeHandle::new(Some(2), "join".to_string()),
2✔
322
                NOOP_JOIN_LEFT_INPUT_PORT
2✔
323
            )
2✔
324
        )));
2✔
325

326
    assert!(dag.edge_handles().any(|e| *e
6✔
327
        == Edge::new(
6✔
328
            Endpoint::new(
6✔
329
                NodeHandle::new(None, "postgres".to_string()),
6✔
330
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2
6✔
331
            ),
6✔
332
            Endpoint::new(
6✔
333
                NodeHandle::new(Some(2), "join".to_string()),
6✔
334
                NOOP_JOIN_RIGHT_INPUT_PORT
6✔
335
            )
6✔
336
        )));
6✔
337

338
    assert!(dag.edge_handles().any(|e| *e
3✔
339
        == Edge::new(
3✔
340
            Endpoint::new(
3✔
341
                NodeHandle::new(Some(1), "join".to_string()),
3✔
342
                DEFAULT_PORT_HANDLE
3✔
343
            ),
3✔
344
            Endpoint::new(
3✔
345
                NodeHandle::new(Some(1), "sink".to_string()),
3✔
346
                COUNTING_SINK_INPUT_PORT
3✔
347
            )
3✔
348
        )));
3✔
349

350
    assert!(dag.edge_handles().any(|e| *e
4✔
351
        == Edge::new(
4✔
352
            Endpoint::new(
4✔
353
                NodeHandle::new(Some(2), "join".to_string()),
4✔
354
                DEFAULT_PORT_HANDLE
4✔
355
            ),
4✔
356
            Endpoint::new(
4✔
357
                NodeHandle::new(Some(2), "sink".to_string()),
4✔
358
                COUNTING_SINK_INPUT_PORT
4✔
359
            )
4✔
360
        )));
4✔
361

362
    assert_eq!(dag.edge_handles().count(), 6);
1✔
363

364
    let tmp_dir = chk!(TempDir::new("test"));
1✔
365
    let mut executor = chk!(DagExecutor::new(
1✔
366
        dag,
×
367
        tmp_dir.path(),
×
368
        ExecutorOptions::default(),
×
369
        Arc::new(AtomicBool::new(true))
×
370
    ));
×
371

372
    //  thread::sleep(Duration::from_millis(3000));
373

374
    chk!(executor.start());
×
375
    assert!(executor.join().is_ok());
1✔
376
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc