• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.16
/dozer-core/src/tests/dag_base_create_errors.rs
1
use crate::checkpoint::create_checkpoint_for_test;
2
use crate::executor::DagExecutor;
3
use crate::node::{
4
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
5
};
6
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
7

8
use crate::tests::dag_base_run::NoopProcessorFactory;
9
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
10
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
11

12
use dozer_log::tokio;
13
use dozer_recordstore::ProcessorRecordStoreDeserializer;
14
use dozer_types::errors::internal::BoxedError;
15
use dozer_types::node::NodeHandle;
16
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
17

18
use std::collections::HashMap;
19
use std::sync::atomic::AtomicBool;
20
use std::sync::Arc;
21

22
#[derive(Debug)]
×
23
struct CreateErrSourceFactory {
24
    panic: bool,
25
}
26

27
impl CreateErrSourceFactory {
28
    pub fn new(panic: bool) -> Self {
2✔
29
        Self { panic }
2✔
30
    }
2✔
31
}
32

33
impl SourceFactory for CreateErrSourceFactory {
34
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, BoxedError> {
2✔
35
        Ok(Schema::default()
2✔
36
            .field(
2✔
37
                FieldDefinition::new(
2✔
38
                    "id".to_string(),
2✔
39
                    FieldType::Int,
2✔
40
                    false,
2✔
41
                    SourceDefinition::Dynamic,
2✔
42
                ),
2✔
43
                true,
2✔
44
            )
2✔
45
            .clone())
2✔
46
    }
2✔
47

48
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
2✔
49
        "error".to_string()
2✔
50
    }
2✔
51

52
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
8✔
53
        vec![OutputPortDef::new(
8✔
54
            DEFAULT_PORT_HANDLE,
8✔
55
            OutputPortType::Stateless,
8✔
56
        )]
8✔
57
    }
8✔
58

59
    fn build(
2✔
60
        &self,
2✔
61
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
62
    ) -> Result<Box<dyn Source>, BoxedError> {
2✔
63
        if self.panic {
2✔
64
            panic!("Generated error");
1✔
65
        } else {
66
            Err("Generated Error".to_string().into())
1✔
67
        }
1✔
68
    }
1✔
69
}
70

71
#[tokio::test]
1✔
72
#[should_panic]
73
async fn test_create_src_err() {
1✔
74
    let count: u64 = 1_000_000;
1✔
75

1✔
76
    let mut dag = Dag::new();
1✔
77
    let latch = Arc::new(AtomicBool::new(true));
1✔
78

1✔
79
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
80
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
81
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
82

1✔
83
    dag.add_source(
1✔
84
        source_handle.clone(),
1✔
85
        Box::new(CreateErrSourceFactory::new(false)),
1✔
86
    );
1✔
87
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
88
    dag.add_sink(
1✔
89
        sink_handle.clone(),
1✔
90
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
91
    );
1✔
92

1✔
93
    dag.connect(
1✔
94
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
95
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
96
    )
1✔
97
    .unwrap();
1✔
98

1✔
99
    dag.connect(
1✔
100
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
101
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
102
    )
1✔
103
    .unwrap();
1✔
104

105
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
106
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
107
        .await
×
108
        .unwrap()
1✔
109
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
110
        .unwrap()
1✔
111
        .join()
1✔
112
        .unwrap();
1✔
113
}
114

115
#[tokio::test]
1✔
116
#[should_panic]
117
async fn test_create_src_panic() {
1✔
118
    let count: u64 = 1_000_000;
1✔
119

1✔
120
    let mut dag = Dag::new();
1✔
121
    let latch = Arc::new(AtomicBool::new(true));
1✔
122

1✔
123
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
124
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
125
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
126

1✔
127
    dag.add_source(
1✔
128
        source_handle.clone(),
1✔
129
        Box::new(CreateErrSourceFactory::new(true)),
1✔
130
    );
1✔
131
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
132
    dag.add_sink(
1✔
133
        sink_handle.clone(),
1✔
134
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
135
    );
1✔
136

1✔
137
    dag.connect(
1✔
138
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
139
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
140
    )
1✔
141
    .unwrap();
1✔
142

1✔
143
    dag.connect(
1✔
144
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
145
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
146
    )
1✔
147
    .unwrap();
1✔
148

149
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
150
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
151
        .await
×
152
        .unwrap()
×
153
        .start(Arc::new(AtomicBool::new(true)), Default::default())
×
154
        .unwrap()
×
155
        .join()
×
156
        .unwrap();
×
157
}
158

159
#[derive(Debug)]
×
160
struct CreateErrProcessorFactory {
161
    panic: bool,
162
}
163

164
impl CreateErrProcessorFactory {
165
    pub fn new(panic: bool) -> Self {
2✔
166
        Self { panic }
2✔
167
    }
2✔
168
}
169

170
impl ProcessorFactory for CreateErrProcessorFactory {
171
    fn type_name(&self) -> String {
×
172
        "CreateErr".to_owned()
×
173
    }
×
174

175
    fn get_output_schema(
2✔
176
        &self,
2✔
177
        _port: &PortHandle,
2✔
178
        _input_schemas: &HashMap<PortHandle, Schema>,
2✔
179
    ) -> Result<Schema, BoxedError> {
2✔
180
        Ok(Schema::default()
2✔
181
            .field(
2✔
182
                FieldDefinition::new(
2✔
183
                    "id".to_string(),
2✔
184
                    FieldType::Int,
2✔
185
                    false,
2✔
186
                    SourceDefinition::Dynamic,
2✔
187
                ),
2✔
188
                true,
2✔
189
            )
2✔
190
            .clone())
2✔
191
    }
2✔
192

193
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
194
        vec![DEFAULT_PORT_HANDLE]
8✔
195
    }
8✔
196

197
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
198
        vec![OutputPortDef::new(
6✔
199
            DEFAULT_PORT_HANDLE,
6✔
200
            OutputPortType::Stateless,
6✔
201
        )]
6✔
202
    }
6✔
203

204
    fn build(
2✔
205
        &self,
2✔
206
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
207
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
208
        _record_store: &ProcessorRecordStoreDeserializer,
2✔
209
        _checkpoint_data: Option<Vec<u8>>,
2✔
210
    ) -> Result<Box<dyn Processor>, BoxedError> {
2✔
211
        if self.panic {
2✔
212
            panic!("Generated error");
1✔
213
        } else {
214
            Err("Generated Error".to_string().into())
1✔
215
        }
1✔
216
    }
1✔
217

218
    fn id(&self) -> String {
×
219
        "CreateErr".to_owned()
×
220
    }
×
221
}
222

223
#[tokio::test]
1✔
224
#[should_panic]
225
async fn test_create_proc_err() {
1✔
226
    let count: u64 = 1_000_000;
1✔
227

1✔
228
    let mut dag = Dag::new();
1✔
229
    let latch = Arc::new(AtomicBool::new(true));
1✔
230

1✔
231
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
232
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
233
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
234

1✔
235
    dag.add_source(
1✔
236
        source_handle.clone(),
1✔
237
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
238
    );
1✔
239
    dag.add_processor(
1✔
240
        proc_handle.clone(),
1✔
241
        Box::new(CreateErrProcessorFactory::new(false)),
1✔
242
    );
1✔
243
    dag.add_sink(
1✔
244
        sink_handle.clone(),
1✔
245
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
246
    );
1✔
247

1✔
248
    dag.connect(
1✔
249
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
250
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
251
    )
1✔
252
    .unwrap();
1✔
253

1✔
254
    dag.connect(
1✔
255
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
256
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
257
    )
1✔
258
    .unwrap();
1✔
259

260
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
261
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
262
        .await
×
263
        .unwrap()
1✔
264
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
265
        .unwrap()
1✔
266
        .join()
1✔
267
        .unwrap();
1✔
268
}
269

270
#[tokio::test]
1✔
271
#[should_panic]
272
async fn test_create_proc_panic() {
1✔
273
    let count: u64 = 1_000_000;
1✔
274

1✔
275
    let mut dag = Dag::new();
1✔
276
    let latch = Arc::new(AtomicBool::new(true));
1✔
277

1✔
278
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
279
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
280
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
281

1✔
282
    dag.add_source(
1✔
283
        source_handle.clone(),
1✔
284
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
285
    );
1✔
286
    dag.add_processor(
1✔
287
        proc_handle.clone(),
1✔
288
        Box::new(CreateErrProcessorFactory::new(true)),
1✔
289
    );
1✔
290
    dag.add_sink(
1✔
291
        sink_handle.clone(),
1✔
292
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
293
    );
1✔
294

1✔
295
    dag.connect(
1✔
296
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
297
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
298
    )
1✔
299
    .unwrap();
1✔
300

1✔
301
    dag.connect(
1✔
302
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
303
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
304
    )
1✔
305
    .unwrap();
1✔
306

307
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
308
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
309
        .await
×
310
        .unwrap()
×
311
        .start(Arc::new(AtomicBool::new(true)), Default::default())
×
312
        .unwrap()
×
313
        .join()
×
314
        .unwrap();
×
315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc