• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6010722164

29 Aug 2023 10:20AM UTC coverage: 76.689% (-1.4%) from 78.07%
6010722164

push

github

web-flow
chore: Create unit tests workflow (#1910)

* chore: Update for Rust 1.72.0

Rust 1.72.0 has introduced a bunch of new lints. Here we fix them all.

`let ... else` finally gets formatted.

* chire: Create unit tests workflow

* Rename and remove useless steps

* remove env vars

* Add concurrency group

* Test unit workflow on 4 cores

* Add mysql service to unit tests

---------

Co-authored-by: chubei <914745487@qq.com>

49006 of 63902 relevant lines covered (76.69%)

48444.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.02
/dozer-core/src/tests/app.rs
1
use crate::app::{App, AppPipeline, PipelineEntryPoint};
2
use crate::appsource::{AppSourceManager, AppSourceMappings};
3
use crate::checkpoint::create_checkpoint_factory_for_test;
4
use crate::executor::{DagExecutor, ExecutorOptions};
5
use crate::node::{OutputPortDef, PortHandle, Source, SourceFactory};
6
use crate::tests::dag_base_run::{
7
    NoopJoinProcessorFactory, NOOP_JOIN_LEFT_INPUT_PORT, NOOP_JOIN_RIGHT_INPUT_PORT,
8
};
9
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
10
use crate::tests::sources::{
11
    DualPortGeneratorSourceFactory, GeneratorSourceFactory,
12
    DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1, DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
13
    GENERATOR_SOURCE_OUTPUT_PORT,
14
};
15
use crate::{Edge, Endpoint, DEFAULT_PORT_HANDLE};
16
use dozer_log::tokio;
17
use dozer_types::errors::internal::BoxedError;
18
use dozer_types::node::NodeHandle;
19
use dozer_types::types::Schema;
20

21
use std::collections::HashMap;
22
use std::sync::atomic::AtomicBool;
23
use std::sync::Arc;
24

25
#[derive(Debug)]
×
26
struct NoneSourceFactory {}
27
impl SourceFactory for NoneSourceFactory {
28
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, BoxedError> {
×
29
        todo!()
×
30
    }
×
31

×
32
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
33
        todo!()
×
34
    }
×
35

×
36
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
37
        todo!()
×
38
    }
×
39

×
40
    fn build(
×
41
        &self,
×
42
        _output_schemas: HashMap<PortHandle, Schema>,
×
43
    ) -> Result<Box<dyn Source>, BoxedError> {
×
44
        todo!()
×
45
    }
×
46
}
×
47

×
48
#[test]
1✔
49
fn test_apps_source_manager_connection_exists() {
1✔
50
    let mut asm = AppSourceManager::new();
1✔
51
    let _r = asm.add(
1✔
52
        Box::new(NoneSourceFactory {}),
1✔
53
        AppSourceMappings::new(
1✔
54
            "conn1".to_string(),
1✔
55
            vec![("table1".to_string(), 1_u16)].into_iter().collect(),
1✔
56
        ),
1✔
57
    );
1✔
58
    let r = asm.add(
1✔
59
        Box::new(NoneSourceFactory {}),
1✔
60
        AppSourceMappings::new(
1✔
61
            "conn1".to_string(),
1✔
62
            vec![("table2".to_string(), 1_u16)].into_iter().collect(),
1✔
63
        ),
1✔
64
    );
1✔
65
    assert!(r.is_err());
1✔
66
}
1✔
67

×
68
#[test]
1✔
69
fn test_apps_source_manager_lookup() {
1✔
70
    let mut asm = AppSourceManager::new();
1✔
71
    asm.add(
1✔
72
        Box::new(NoneSourceFactory {}),
1✔
73
        AppSourceMappings::new(
1✔
74
            "conn1".to_string(),
1✔
75
            vec![("table1".to_string(), 1_u16)].into_iter().collect(),
1✔
76
        ),
1✔
77
    )
1✔
78
    .unwrap();
1✔
79

1✔
80
    let r = asm.get_endpoint("table1").unwrap();
1✔
81
    assert_eq!(r.node.id, "conn1");
1✔
82
    assert_eq!(r.port, 1_u16);
1✔
83

×
84
    let r = asm.get_endpoint("Non-existent source");
1✔
85
    assert!(r.is_err());
1✔
86

87
    // Insert another source
×
88
    asm.add(
1✔
89
        Box::new(NoneSourceFactory {}),
1✔
90
        AppSourceMappings::new(
1✔
91
            "conn2".to_string(),
1✔
92
            vec![("table2".to_string(), 2_u16)].into_iter().collect(),
1✔
93
        ),
1✔
94
    )
1✔
95
    .unwrap();
1✔
96

1✔
97
    let r = asm.get_endpoint("table3");
1✔
98
    assert!(r.is_err());
1✔
99

×
100
    let r = asm.get_endpoint("table1").unwrap();
1✔
101
    assert_eq!(r.node.id, "conn1");
1✔
102
    assert_eq!(r.port, 1_u16);
1✔
103

×
104
    let r = asm.get_endpoint("table2").unwrap();
1✔
105
    assert_eq!(r.node.id, "conn2");
1✔
106
    assert_eq!(r.port, 2_u16);
1✔
107
}
1✔
108

×
109
#[test]
1✔
110
fn test_apps_source_manager_lookup_multiple_ports() {
1✔
111
    let mut asm = AppSourceManager::new();
1✔
112
    asm.add(
1✔
113
        Box::new(NoneSourceFactory {}),
1✔
114
        AppSourceMappings::new(
1✔
115
            "conn1".to_string(),
1✔
116
            vec![("table1".to_string(), 1_u16), ("table2".to_string(), 2_u16)]
1✔
117
                .into_iter()
1✔
118
                .collect(),
1✔
119
        ),
1✔
120
    )
1✔
121
    .unwrap();
1✔
122

1✔
123
    let r = asm.get_endpoint("table1").unwrap();
1✔
124
    assert_eq!(r.node.id, "conn1");
1✔
125
    assert_eq!(r.port, 1_u16);
1✔
126

×
127
    let r = asm.get_endpoint("table2").unwrap();
1✔
128
    assert_eq!(r.node.id, "conn1");
1✔
129
    assert_eq!(r.port, 2_u16);
1✔
130
}
1✔
131

×
132
#[tokio::test]
1✔
133
async fn test_app_dag() {
1✔
134
    let latch = Arc::new(AtomicBool::new(true));
1✔
135

1✔
136
    let mut asm = AppSourceManager::new();
1✔
137
    asm.add(
1✔
138
        Box::new(DualPortGeneratorSourceFactory::new(
1✔
139
            10_000,
1✔
140
            latch.clone(),
1✔
141
            true,
1✔
142
        )),
1✔
143
        AppSourceMappings::new(
1✔
144
            "postgres".to_string(),
1✔
145
            vec![
1✔
146
                (
1✔
147
                    "users_postgres".to_string(),
1✔
148
                    DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1,
1✔
149
                ),
1✔
150
                (
1✔
151
                    "transactions".to_string(),
1✔
152
                    DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2,
1✔
153
                ),
1✔
154
            ]
1✔
155
            .into_iter()
1✔
156
            .collect(),
1✔
157
        ),
1✔
158
    )
1✔
159
    .unwrap();
1✔
160

1✔
161
    asm.add(
1✔
162
        Box::new(GeneratorSourceFactory::new(10_000, latch.clone(), true)),
1✔
163
        AppSourceMappings::new(
1✔
164
            "snowflake".to_string(),
1✔
165
            vec![("users_snowflake".to_string(), GENERATOR_SOURCE_OUTPUT_PORT)]
1✔
166
                .into_iter()
1✔
167
                .collect(),
1✔
168
        ),
1✔
169
    )
1✔
170
    .unwrap();
1✔
171

1✔
172
    let mut app = App::new(asm);
1✔
173

1✔
174
    let mut p1 = AppPipeline::new_with_default_flags();
1✔
175
    p1.add_processor(
1✔
176
        Box::new(NoopJoinProcessorFactory {}),
1✔
177
        "join",
1✔
178
        vec![
1✔
179
            PipelineEntryPoint::new("users_postgres".to_string(), NOOP_JOIN_LEFT_INPUT_PORT),
1✔
180
            PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT),
1✔
181
        ],
1✔
182
    );
1✔
183
    p1.add_sink(
1✔
184
        Box::new(CountingSinkFactory::new(20_000, latch.clone())),
1✔
185
        "sink",
1✔
186
        None,
1✔
187
    );
1✔
188
    p1.connect_nodes(
1✔
189
        "join",
1✔
190
        DEFAULT_PORT_HANDLE,
1✔
191
        "sink",
1✔
192
        COUNTING_SINK_INPUT_PORT,
1✔
193
    );
1✔
194

1✔
195
    app.add_pipeline(p1);
1✔
196

1✔
197
    let mut p2 = AppPipeline::new_with_default_flags();
1✔
198
    p2.add_processor(
1✔
199
        Box::new(NoopJoinProcessorFactory {}),
1✔
200
        "join",
1✔
201
        vec![
1✔
202
            PipelineEntryPoint::new("users_snowflake".to_string(), NOOP_JOIN_LEFT_INPUT_PORT),
1✔
203
            PipelineEntryPoint::new("transactions".to_string(), NOOP_JOIN_RIGHT_INPUT_PORT),
1✔
204
        ],
1✔
205
    );
1✔
206
    p2.add_sink(
1✔
207
        Box::new(CountingSinkFactory::new(20_000, latch)),
1✔
208
        "sink",
1✔
209
        None,
1✔
210
    );
1✔
211
    p2.connect_nodes(
1✔
212
        "join",
1✔
213
        DEFAULT_PORT_HANDLE,
1✔
214
        "sink",
1✔
215
        COUNTING_SINK_INPUT_PORT,
1✔
216
    );
1✔
217

1✔
218
    app.add_pipeline(p2);
1✔
219

1✔
220
    let dag = app.into_dag().unwrap();
1✔
221
    let edges = dag.edge_handles();
1✔
222

1✔
223
    assert!(edges.iter().any(|e| *e
4✔
224
        == Edge::new(
4✔
225
            Endpoint::new(
4✔
226
                NodeHandle::new(None, "postgres".to_string()),
4✔
227
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_1
4✔
228
            ),
4✔
229
            Endpoint::new(
4✔
230
                NodeHandle::new(Some(1), "join".to_string()),
4✔
231
                NOOP_JOIN_LEFT_INPUT_PORT
4✔
232
            )
4✔
233
        )));
4✔
234

×
235
    assert!(edges.iter().any(|e| *e
1✔
236
        == Edge::new(
1✔
237
            Endpoint::new(
1✔
238
                NodeHandle::new(None, "postgres".to_string()),
1✔
239
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2
1✔
240
            ),
1✔
241
            Endpoint::new(
1✔
242
                NodeHandle::new(Some(1), "join".to_string()),
1✔
243
                NOOP_JOIN_RIGHT_INPUT_PORT
1✔
244
            )
1✔
245
        )));
1✔
246

×
247
    assert!(edges.iter().any(|e| *e
3✔
248
        == Edge::new(
3✔
249
            Endpoint::new(
3✔
250
                NodeHandle::new(None, "snowflake".to_string()),
3✔
251
                GENERATOR_SOURCE_OUTPUT_PORT
3✔
252
            ),
3✔
253
            Endpoint::new(
3✔
254
                NodeHandle::new(Some(2), "join".to_string()),
3✔
255
                NOOP_JOIN_LEFT_INPUT_PORT
3✔
256
            )
3✔
257
        )));
3✔
258

×
259
    assert!(edges.iter().any(|e| *e
5✔
260
        == Edge::new(
5✔
261
            Endpoint::new(
5✔
262
                NodeHandle::new(None, "postgres".to_string()),
5✔
263
                DUAL_PORT_GENERATOR_SOURCE_OUTPUT_PORT_2
5✔
264
            ),
5✔
265
            Endpoint::new(
5✔
266
                NodeHandle::new(Some(2), "join".to_string()),
5✔
267
                NOOP_JOIN_RIGHT_INPUT_PORT
5✔
268
            )
5✔
269
        )));
5✔
270

×
271
    assert!(edges.iter().any(|e| *e
6✔
272
        == Edge::new(
6✔
273
            Endpoint::new(
6✔
274
                NodeHandle::new(Some(1), "join".to_string()),
6✔
275
                DEFAULT_PORT_HANDLE
6✔
276
            ),
6✔
277
            Endpoint::new(
6✔
278
                NodeHandle::new(Some(1), "sink".to_string()),
6✔
279
                COUNTING_SINK_INPUT_PORT
6✔
280
            )
6✔
281
        )));
6✔
282

×
283
    assert!(edges.iter().any(|e| *e
2✔
284
        == Edge::new(
2✔
285
            Endpoint::new(
2✔
286
                NodeHandle::new(Some(2), "join".to_string()),
2✔
287
                DEFAULT_PORT_HANDLE
2✔
288
            ),
2✔
289
            Endpoint::new(
2✔
290
                NodeHandle::new(Some(2), "sink".to_string()),
2✔
291
                COUNTING_SINK_INPUT_PORT
2✔
292
            )
2✔
293
        )));
2✔
294

×
295
    assert_eq!(edges.len(), 6);
1✔
296

×
297
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
298
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
299
        .unwrap()
1✔
300
        .start(Arc::new(AtomicBool::new(true)))
1✔
301
        .unwrap()
1✔
302
        .join()
1✔
303
        .unwrap();
1✔
304
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc