• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.07
/dozer-core/src/tests/dag_base_errors.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::checkpoint::create_checkpoint_factory_for_test;
3
use crate::epoch::Epoch;
4
use crate::executor::{DagExecutor, ExecutorOptions};
5
use crate::executor_operation::ProcessorOperation;
6
use crate::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
8
    Source, SourceFactory,
9
};
10
use crate::processor_record::ProcessorRecordStore;
11
use crate::tests::dag_base_run::NoopProcessorFactory;
12
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
13
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
14
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
15
use dozer_log::storage::{Object, Queue};
16
use dozer_log::tokio;
17
use dozer_types::errors::internal::BoxedError;
18
use dozer_types::ingestion_types::IngestionMessage;
19
use dozer_types::node::{NodeHandle, OpIdentifier};
20
use dozer_types::types::{
21
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
22
};
23

24
use std::collections::HashMap;
25
use std::panic;
26

27
use std::sync::atomic::AtomicBool;
28
use std::sync::Arc;
29

30
// Test when error is generated by a processor
31

32
#[derive(Debug)]
×
33
struct ErrorProcessorFactory {
34
    err_on: u64,
35
    panic: bool,
36
}
37

38
impl ProcessorFactory for ErrorProcessorFactory {
39
    fn type_name(&self) -> String {
×
40
        "Error".to_owned()
×
41
    }
×
42

43
    fn get_output_schema(
3✔
44
        &self,
3✔
45
        _output_port: &PortHandle,
3✔
46
        input_schemas: &HashMap<PortHandle, Schema>,
3✔
47
    ) -> Result<Schema, BoxedError> {
3✔
48
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
49
    }
3✔
50

51
    fn get_input_ports(&self) -> Vec<PortHandle> {
12✔
52
        vec![DEFAULT_PORT_HANDLE]
12✔
53
    }
12✔
54

55
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
56
        vec![OutputPortDef::new(
9✔
57
            DEFAULT_PORT_HANDLE,
9✔
58
            OutputPortType::Stateless,
9✔
59
        )]
9✔
60
    }
9✔
61

62
    fn build(
3✔
63
        &self,
3✔
64
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
65
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
66
        _record_store: &ProcessorRecordStore,
3✔
67
        _checkpoint_data: Option<Vec<u8>>,
3✔
68
    ) -> Result<Box<dyn Processor>, BoxedError> {
3✔
69
        Ok(Box::new(ErrorProcessor {
3✔
70
            err_on: self.err_on,
3✔
71
            count: 0,
3✔
72
            panic: self.panic,
3✔
73
        }))
3✔
74
    }
3✔
75

76
    fn id(&self) -> String {
×
77
        "Error".to_owned()
×
78
    }
×
79
}
80

81
#[derive(Debug)]
×
82
struct ErrorProcessor {
83
    err_on: u64,
84
    count: u64,
85
    panic: bool,
86
}
87

88
impl Processor for ErrorProcessor {
89
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
314✔
90
        Ok(())
314✔
91
    }
314✔
92

93
    fn process(
2,400,000✔
94
        &mut self,
2,400,000✔
95
        _from_port: PortHandle,
2,400,000✔
96
        _record_store: &ProcessorRecordStore,
2,400,000✔
97
        op: ProcessorOperation,
2,400,000✔
98
        fw: &mut dyn ProcessorChannelForwarder,
2,400,000✔
99
    ) -> Result<(), BoxedError> {
2,400,000✔
100
        self.count += 1;
2,400,000✔
101
        if self.count == self.err_on {
2,400,000✔
102
            if self.panic {
3✔
103
                panic!("Generated error");
1✔
104
            } else {
105
                return Err("Uknown".to_string().into());
2✔
106
            }
107
        }
2,399,997✔
108

2,399,997✔
109
        fw.send(op, DEFAULT_PORT_HANDLE);
2,399,997✔
110
        Ok(())
2,399,997✔
111
    }
2,399,999✔
112

113
    fn serialize(
21✔
114
        &mut self,
21✔
115
        _record_store: &ProcessorRecordStore,
21✔
116
        _object: Object,
21✔
117
    ) -> Result<(), BoxedError> {
21✔
118
        Ok(())
21✔
119
    }
21✔
120
}
121

122
#[tokio::test]
1✔
123
#[should_panic]
124
async fn test_run_dag_proc_err_panic() {
1✔
125
    let count: u64 = 1_000_000;
1✔
126

1✔
127
    let mut dag = Dag::new();
1✔
128
    let latch = Arc::new(AtomicBool::new(true));
1✔
129

1✔
130
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
131
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
132
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
133

1✔
134
    dag.add_source(
1✔
135
        source_handle.clone(),
1✔
136
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
137
    );
1✔
138
    dag.add_processor(
1✔
139
        proc_handle.clone(),
1✔
140
        Box::new(ErrorProcessorFactory {
1✔
141
            err_on: 800_000,
1✔
142
            panic: true,
1✔
143
        }),
1✔
144
    );
1✔
145
    dag.add_sink(
1✔
146
        sink_handle.clone(),
1✔
147
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
148
    );
1✔
149

1✔
150
    dag.connect(
1✔
151
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
152
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
153
    )
1✔
154
    .unwrap();
1✔
155

1✔
156
    dag.connect(
1✔
157
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
158
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
159
    )
1✔
160
    .unwrap();
1✔
161

162
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
11✔
163
    DagExecutor::new(
1✔
164
        dag,
1✔
165
        checkpoint_factory,
1✔
166
        Default::default(),
1✔
167
        ExecutorOptions::default(),
1✔
168
    )
1✔
169
    .await
×
170
    .unwrap()
1✔
171
    .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
172
    .unwrap()
1✔
173
    .join()
1✔
174
    .unwrap();
1✔
175
}
176

177
#[tokio::test]
1✔
178
#[should_panic]
179
async fn test_run_dag_proc_err_2() {
1✔
180
    let count: u64 = 1_000_000;
1✔
181

1✔
182
    let mut dag = Dag::new();
1✔
183
    let latch = Arc::new(AtomicBool::new(true));
1✔
184

1✔
185
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
186
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
187
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
188
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
189

1✔
190
    dag.add_source(
1✔
191
        source_handle.clone(),
1✔
192
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
193
    );
1✔
194
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
195

1✔
196
    dag.add_processor(
1✔
197
        proc_err_handle.clone(),
1✔
198
        Box::new(ErrorProcessorFactory {
1✔
199
            err_on: 800_000,
1✔
200
            panic: false,
1✔
201
        }),
1✔
202
    );
1✔
203

1✔
204
    dag.add_sink(
1✔
205
        sink_handle.clone(),
1✔
206
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
207
    );
1✔
208

1✔
209
    dag.connect(
1✔
210
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
211
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
212
    )
1✔
213
    .unwrap();
1✔
214

1✔
215
    dag.connect(
1✔
216
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
217
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
218
    )
1✔
219
    .unwrap();
1✔
220

1✔
221
    dag.connect(
1✔
222
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
223
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
224
    )
1✔
225
    .unwrap();
1✔
226

227
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
11✔
228
    DagExecutor::new(
1✔
229
        dag,
1✔
230
        checkpoint_factory,
1✔
231
        Default::default(),
1✔
232
        ExecutorOptions::default(),
1✔
233
    )
1✔
234
    .await
×
235
    .unwrap()
1✔
236
    .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
237
    .unwrap()
1✔
238
    .join()
1✔
239
    .unwrap();
1✔
240
}
241

242
#[tokio::test]
1✔
243
#[should_panic]
244
async fn test_run_dag_proc_err_3() {
1✔
245
    let count: u64 = 1_000_000;
1✔
246

1✔
247
    let mut dag = Dag::new();
1✔
248
    let latch = Arc::new(AtomicBool::new(true));
1✔
249

1✔
250
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
251
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
252
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
253
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
254

1✔
255
    dag.add_source(
1✔
256
        source_handle.clone(),
1✔
257
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
258
    );
1✔
259

1✔
260
    dag.add_processor(
1✔
261
        proc_err_handle.clone(),
1✔
262
        Box::new(ErrorProcessorFactory {
1✔
263
            err_on: 800_000,
1✔
264
            panic: false,
1✔
265
        }),
1✔
266
    );
1✔
267

1✔
268
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
269

1✔
270
    dag.add_sink(
1✔
271
        sink_handle.clone(),
1✔
272
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
273
    );
1✔
274

1✔
275
    dag.connect(
1✔
276
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
277
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
278
    )
1✔
279
    .unwrap();
1✔
280

1✔
281
    dag.connect(
1✔
282
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
283
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
284
    )
1✔
285
    .unwrap();
1✔
286

1✔
287
    dag.connect(
1✔
288
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
289
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
290
    )
1✔
291
    .unwrap();
1✔
292

293
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
11✔
294
    DagExecutor::new(
1✔
295
        dag,
1✔
296
        checkpoint_factory,
1✔
297
        Default::default(),
1✔
298
        ExecutorOptions::default(),
1✔
299
    )
1✔
300
    .await
×
301
    .unwrap()
1✔
302
    .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
303
    .unwrap()
1✔
304
    .join()
1✔
305
    .unwrap();
1✔
306
}
307

308
// Test when error is generated by a source
309

310
#[derive(Debug)]
×
311
pub(crate) struct ErrGeneratorSourceFactory {
312
    count: u64,
313
    err_at: u64,
314
}
315

316
impl ErrGeneratorSourceFactory {
317
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
318
        Self { count, err_at }
1✔
319
    }
1✔
320
}
321

322
impl SourceFactory for ErrGeneratorSourceFactory {
323
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, BoxedError> {
1✔
324
        Ok(Schema::default()
1✔
325
            .field(
1✔
326
                FieldDefinition::new(
1✔
327
                    "id".to_string(),
1✔
328
                    FieldType::String,
1✔
329
                    false,
1✔
330
                    SourceDefinition::Dynamic,
1✔
331
                ),
1✔
332
                true,
1✔
333
            )
1✔
334
            .field(
1✔
335
                FieldDefinition::new(
1✔
336
                    "value".to_string(),
1✔
337
                    FieldType::String,
1✔
338
                    false,
1✔
339
                    SourceDefinition::Dynamic,
1✔
340
                ),
1✔
341
                false,
1✔
342
            )
1✔
343
            .clone())
1✔
344
    }
1✔
345

346
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
347
        "error".to_string()
×
348
    }
×
349

350
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
351
        vec![OutputPortDef::new(
3✔
352
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
353
            OutputPortType::Stateless,
3✔
354
        )]
3✔
355
    }
3✔
356

357
    fn build(
1✔
358
        &self,
1✔
359
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
360
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
361
        Ok(Box::new(ErrGeneratorSource {
1✔
362
            count: self.count,
1✔
363
            err_at: self.err_at,
1✔
364
        }))
1✔
365
    }
1✔
366
}
367

368
#[derive(Debug)]
×
369
pub(crate) struct ErrGeneratorSource {
370
    count: u64,
371
    err_at: u64,
372
}
373

374
impl Source for ErrGeneratorSource {
375
    fn can_start_from(&self, _last_checkpoint: OpIdentifier) -> Result<bool, BoxedError> {
×
376
        Ok(false)
×
377
    }
×
378

379
    fn start(
1✔
380
        &self,
1✔
381
        fw: &mut dyn SourceChannelForwarder,
1✔
382
        _checkpoint: Option<OpIdentifier>,
1✔
383
    ) -> Result<(), BoxedError> {
1✔
384
        for n in 1..(self.count + 1) {
123,699✔
385
            if n == self.err_at {
123,699✔
386
                return Err("Generated Error".to_string().into());
×
387
            }
123,699✔
388

123,699✔
389
            fw.send(
123,699✔
390
                IngestionMessage::new_op(
123,699✔
391
                    n,
123,699✔
392
                    0,
123,699✔
393
                    0,
123,699✔
394
                    Operation::Insert {
123,699✔
395
                        new: Record::new(vec![
123,699✔
396
                            Field::String(format!("key_{n}")),
123,699✔
397
                            Field::String(format!("value_{n}")),
123,699✔
398
                        ]),
123,699✔
399
                    },
123,699✔
400
                ),
123,699✔
401
                GENERATOR_SOURCE_OUTPUT_PORT,
123,699✔
402
            )?;
123,699✔
403
        }
404
        Ok(())
×
405
    }
1✔
406
}
407

408
#[tokio::test]
1✔
409
async fn test_run_dag_src_err() {
1✔
410
    let count: u64 = 1_000_000;
1✔
411

1✔
412
    let mut dag = Dag::new();
1✔
413
    let latch = Arc::new(AtomicBool::new(true));
1✔
414

1✔
415
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
416
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
417
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
418

1✔
419
    dag.add_source(
1✔
420
        source_handle.clone(),
1✔
421
        Box::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
422
    );
1✔
423
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
424
    dag.add_sink(
1✔
425
        sink_handle.clone(),
1✔
426
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
427
    );
1✔
428

1✔
429
    dag.connect(
1✔
430
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
431
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
432
    )
1✔
433
    .unwrap();
1✔
434

1✔
435
    dag.connect(
1✔
436
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
437
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
438
    )
1✔
439
    .unwrap();
1✔
440

441
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
11✔
442
    DagExecutor::new(
1✔
443
        dag,
1✔
444
        checkpoint_factory,
1✔
445
        Default::default(),
1✔
446
        ExecutorOptions::default(),
1✔
447
    )
1✔
448
    .await
×
449
    .unwrap()
1✔
450
    .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
451
    .unwrap();
1✔
452
    // join_handle.join().unwrap();
453
}
454

455
#[derive(Debug)]
×
456
pub(crate) struct ErrSinkFactory {
457
    err_at: u64,
458
    panic: bool,
459
}
460

461
impl ErrSinkFactory {
462
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
463
        Self { err_at, panic }
2✔
464
    }
2✔
465
}
466

467
impl SinkFactory for ErrSinkFactory {
468
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
469
        vec![COUNTING_SINK_INPUT_PORT]
8✔
470
    }
8✔
471

472
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
2✔
473
        Ok(())
2✔
474
    }
2✔
475

476
    fn build(
2✔
477
        &self,
2✔
478
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
479
    ) -> Result<Box<dyn Sink>, BoxedError> {
2✔
480
        Ok(Box::new(ErrSink {
2✔
481
            err_at: self.err_at,
2✔
482
            current: 0,
2✔
483
            panic: self.panic,
2✔
484
        }))
2✔
485
    }
2✔
486
}
487

488
#[derive(Debug)]
×
489
pub(crate) struct ErrSink {
490
    err_at: u64,
491
    current: u64,
492
    panic: bool,
493
}
494
impl Sink for ErrSink {
495
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
74✔
496
        Ok(())
74✔
497
    }
74✔
498

499
    fn process(
400,000✔
500
        &mut self,
400,000✔
501
        _from_port: PortHandle,
400,000✔
502
        _record_store: &ProcessorRecordStore,
400,000✔
503
        _op: ProcessorOperation,
400,000✔
504
    ) -> Result<(), BoxedError> {
400,000✔
505
        self.current += 1;
400,000✔
506
        if self.current == self.err_at {
400,000✔
507
            if self.panic {
2✔
508
                panic!("Generated error");
1✔
509
            } else {
510
                return Err("Generated error".to_string().into());
1✔
511
            }
512
        }
399,998✔
513
        Ok(())
399,998✔
514
    }
399,999✔
515

516
    fn persist(&mut self, _queue: &Queue) -> Result<(), BoxedError> {
2✔
517
        Ok(())
2✔
518
    }
2✔
519

520
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
521
        Ok(())
×
522
    }
×
523
}
524

525
#[tokio::test]
1✔
526
#[should_panic]
527
async fn test_run_dag_sink_err() {
1✔
528
    let count: u64 = 1_000_000;
1✔
529

1✔
530
    let mut dag = Dag::new();
1✔
531
    let latch = Arc::new(AtomicBool::new(true));
1✔
532

1✔
533
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
534
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
535
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
536

1✔
537
    dag.add_source(
1✔
538
        source_handle.clone(),
1✔
539
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
540
    );
1✔
541
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
542
    dag.add_sink(
1✔
543
        sink_handle.clone(),
1✔
544
        Box::new(ErrSinkFactory::new(200_000, false)),
1✔
545
    );
1✔
546

1✔
547
    dag.connect(
1✔
548
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
549
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
550
    )
1✔
551
    .unwrap();
1✔
552

1✔
553
    dag.connect(
1✔
554
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
555
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
556
    )
1✔
557
    .unwrap();
1✔
558

559
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
11✔
560
    DagExecutor::new(
1✔
561
        dag,
1✔
562
        checkpoint_factory,
1✔
563
        Default::default(),
1✔
564
        ExecutorOptions::default(),
1✔
565
    )
1✔
566
    .await
×
567
    .unwrap()
1✔
568
    .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
569
    .unwrap()
1✔
570
    .join()
1✔
571
    .unwrap();
1✔
572
}
573

574
#[tokio::test]
1✔
575
#[should_panic]
576
async fn test_run_dag_sink_err_panic() {
1✔
577
    let count: u64 = 1_000_000;
1✔
578

1✔
579
    let mut dag = Dag::new();
1✔
580
    let latch = Arc::new(AtomicBool::new(true));
1✔
581

1✔
582
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
583
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
584
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
585

1✔
586
    dag.add_source(
1✔
587
        source_handle.clone(),
1✔
588
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
589
    );
1✔
590
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
591
    dag.add_sink(
1✔
592
        sink_handle.clone(),
1✔
593
        Box::new(ErrSinkFactory::new(200_000, true)),
1✔
594
    );
1✔
595

1✔
596
    dag.connect(
1✔
597
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
598
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
599
    )
1✔
600
    .unwrap();
1✔
601

1✔
602
    dag.connect(
1✔
603
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
604
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
605
    )
1✔
606
    .unwrap();
1✔
607

608
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
11✔
609
    DagExecutor::new(
1✔
610
        dag,
1✔
611
        checkpoint_factory,
1✔
612
        Default::default(),
1✔
613
        ExecutorOptions::default(),
1✔
614
    )
1✔
615
    .await
×
616
    .unwrap()
1✔
617
    .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
618
    .unwrap()
1✔
619
    .join()
1✔
620
    .unwrap();
1✔
621
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc