• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4315007357

pending completion
4315007357

push

github

GitHub
fix: Sink should only be built after all source checkpoints are checked (#1112)

280 of 280 new or added lines in 24 files covered. (100.0%)

28292 of 38914 relevant lines covered (72.7%)

64132.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.66
/dozer-core/src/tests/dag_base_errors.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::chk;
3
use crate::errors::ExecutionError;
4
use crate::executor::{DagExecutor, ExecutorOptions};
5
use crate::node::{
6
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
7
    Source, SourceFactory,
8
};
9
use crate::record_store::RecordReader;
10
use crate::tests::dag_base_run::NoopProcessorFactory;
11
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
12
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
13
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
14
use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
15
use dozer_types::ingestion_types::IngestionMessage;
16
use dozer_types::node::{NodeHandle, SourceStates};
17
use dozer_types::types::{
18
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
19
};
20

21
use std::collections::HashMap;
22
use std::panic;
23

24
use std::sync::atomic::AtomicBool;
25
use std::sync::Arc;
26

27
use crate::epoch::Epoch;
28

29
use crate::tests::app::NoneContext;
30
use tempdir::TempDir;
31

32
// Test when error is generated by a processor
33

×
34
#[derive(Debug)]
×
35
struct ErrorProcessorFactory {
36
    err_on: u64,
37
    panic: bool,
38
}
39

40
impl ProcessorFactory<NoneContext> for ErrorProcessorFactory {
×
41
    fn get_output_schema(
3✔
42
        &self,
3✔
43
        _output_port: &PortHandle,
3✔
44
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
3✔
45
    ) -> Result<(Schema, NoneContext), ExecutionError> {
3✔
46
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
47
    }
3✔
48

×
49
    fn get_input_ports(&self) -> Vec<PortHandle> {
12✔
50
        vec![DEFAULT_PORT_HANDLE]
12✔
51
    }
12✔
52

×
53
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
54
        vec![OutputPortDef::new(
9✔
55
            DEFAULT_PORT_HANDLE,
9✔
56
            OutputPortType::Stateless,
9✔
57
        )]
9✔
58
    }
9✔
59

×
60
    fn build(
3✔
61
        &self,
3✔
62
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
63
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
64
        _txn: &mut LmdbExclusiveTransaction,
3✔
65
    ) -> Result<Box<dyn Processor>, ExecutionError> {
3✔
66
        Ok(Box::new(ErrorProcessor {
3✔
67
            err_on: self.err_on,
3✔
68
            count: 0,
3✔
69
            panic: self.panic,
3✔
70
        }))
3✔
71
    }
3✔
72
}
73

×
74
#[derive(Debug)]
×
75
struct ErrorProcessor {
76
    err_on: u64,
77
    count: u64,
78
    panic: bool,
79
}
80

81
impl Processor for ErrorProcessor {
×
82
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
412✔
83
        Ok(())
412✔
84
    }
412✔
85

×
86
    fn process(
2,311,489✔
87
        &mut self,
2,311,489✔
88
        _from_port: PortHandle,
2,311,489✔
89
        op: Operation,
2,311,489✔
90
        fw: &mut dyn ProcessorChannelForwarder,
2,311,489✔
91
        _tx: &SharedTransaction,
2,311,489✔
92
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
2,311,489✔
93
    ) -> Result<(), ExecutionError> {
2,311,489✔
94
        self.count += 1;
2,311,489✔
95
        if self.count == self.err_on {
2,311,489✔
96
            if self.panic {
3✔
97
                panic!("Generated error");
1✔
98
            } else {
×
99
                return Err(ExecutionError::InvalidOperation("Uknown".to_string()));
2✔
100
            }
×
101
        }
2,311,486✔
102

2,311,486✔
103
        fw.send(op, DEFAULT_PORT_HANDLE)
2,311,486✔
104
    }
2,311,488✔
105
}
106

×
107
#[test]
1✔
108
#[should_panic]
×
109
fn test_run_dag_proc_err_panic() {
1✔
110
    let count: u64 = 1_000_000;
1✔
111

1✔
112
    let mut dag = Dag::new();
1✔
113
    let latch = Arc::new(AtomicBool::new(true));
1✔
114

1✔
115
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
116
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
117
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
118

1✔
119
    dag.add_source(
1✔
120
        source_handle.clone(),
1✔
121
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
122
    );
1✔
123
    dag.add_processor(
1✔
124
        proc_handle.clone(),
1✔
125
        Arc::new(ErrorProcessorFactory {
1✔
126
            err_on: 800_000,
1✔
127
            panic: true,
1✔
128
        }),
1✔
129
    );
1✔
130
    dag.add_sink(
1✔
131
        sink_handle.clone(),
1✔
132
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
133
    );
1✔
134

1✔
135
    chk!(dag.connect(
1✔
136
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
137
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
138
    ));
1✔
139

1✔
140
    chk!(dag.connect(
1✔
141
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
142
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
143
    ));
×
144

1✔
145
    let tmp_dir = chk!(TempDir::new("test"));
1✔
146
    DagExecutor::new(
1✔
147
        dag,
1✔
148
        tmp_dir.path().to_path_buf(),
1✔
149
        ExecutorOptions::default(),
1✔
150
    )
1✔
151
    .unwrap()
1✔
152
    .start(Arc::new(AtomicBool::new(true)))
1✔
153
    .unwrap()
1✔
154
    .join()
1✔
155
    .unwrap();
1✔
156
}
1✔
157

×
158
#[test]
1✔
159
#[should_panic]
×
160
fn test_run_dag_proc_err_2() {
1✔
161
    let count: u64 = 1_000_000;
1✔
162

1✔
163
    let mut dag = Dag::new();
1✔
164
    let latch = Arc::new(AtomicBool::new(true));
1✔
165

1✔
166
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
167
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
168
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
169
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
170

1✔
171
    dag.add_source(
1✔
172
        source_handle.clone(),
1✔
173
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
174
    );
1✔
175
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
176

1✔
177
    dag.add_processor(
1✔
178
        proc_err_handle.clone(),
1✔
179
        Arc::new(ErrorProcessorFactory {
1✔
180
            err_on: 800_000,
1✔
181
            panic: false,
1✔
182
        }),
1✔
183
    );
1✔
184

1✔
185
    dag.add_sink(
1✔
186
        sink_handle.clone(),
1✔
187
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
188
    );
1✔
189

1✔
190
    chk!(dag.connect(
1✔
191
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
192
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
193
    ));
1✔
194

1✔
195
    chk!(dag.connect(
1✔
196
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
197
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
198
    ));
1✔
199

1✔
200
    chk!(dag.connect(
1✔
201
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
202
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
203
    ));
×
204

1✔
205
    let tmp_dir = chk!(TempDir::new("test"));
1✔
206
    DagExecutor::new(
1✔
207
        dag,
1✔
208
        tmp_dir.path().to_path_buf(),
1✔
209
        ExecutorOptions::default(),
1✔
210
    )
1✔
211
    .unwrap()
1✔
212
    .start(Arc::new(AtomicBool::new(true)))
1✔
213
    .unwrap()
1✔
214
    .join()
1✔
215
    .unwrap();
1✔
216
}
1✔
217

×
218
#[test]
1✔
219
#[should_panic]
×
220
fn test_run_dag_proc_err_3() {
1✔
221
    let count: u64 = 1_000_000;
1✔
222

1✔
223
    let mut dag = Dag::new();
1✔
224
    let latch = Arc::new(AtomicBool::new(true));
1✔
225

1✔
226
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
227
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
228
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
229
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
230

1✔
231
    dag.add_source(
1✔
232
        source_handle.clone(),
1✔
233
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
234
    );
1✔
235

1✔
236
    dag.add_processor(
1✔
237
        proc_err_handle.clone(),
1✔
238
        Arc::new(ErrorProcessorFactory {
1✔
239
            err_on: 800_000,
1✔
240
            panic: false,
1✔
241
        }),
1✔
242
    );
1✔
243

1✔
244
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
245

1✔
246
    dag.add_sink(
1✔
247
        sink_handle.clone(),
1✔
248
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
249
    );
1✔
250

1✔
251
    chk!(dag.connect(
1✔
252
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
253
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
254
    ));
1✔
255

1✔
256
    chk!(dag.connect(
1✔
257
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
258
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
259
    ));
1✔
260

1✔
261
    chk!(dag.connect(
1✔
262
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
263
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
264
    ));
×
265

1✔
266
    let tmp_dir = chk!(TempDir::new("test"));
1✔
267
    DagExecutor::new(
1✔
268
        dag,
1✔
269
        tmp_dir.path().to_path_buf(),
1✔
270
        ExecutorOptions::default(),
1✔
271
    )
1✔
272
    .unwrap()
1✔
273
    .start(Arc::new(AtomicBool::new(true)))
1✔
274
    .unwrap()
1✔
275
    .join()
1✔
276
    .unwrap();
1✔
277
}
1✔
278

279
// Test when error is generated by a source
280

×
281
#[derive(Debug)]
×
282
pub(crate) struct ErrGeneratorSourceFactory {
283
    count: u64,
284
    err_at: u64,
285
}
286

287
impl ErrGeneratorSourceFactory {
×
288
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
289
        Self { count, err_at }
1✔
290
    }
1✔
291
}
292

293
impl SourceFactory<NoneContext> for ErrGeneratorSourceFactory {
×
294
    fn get_output_schema(
1✔
295
        &self,
1✔
296
        _port: &PortHandle,
1✔
297
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
298
        Ok((
1✔
299
            Schema::empty()
1✔
300
                .field(
1✔
301
                    FieldDefinition::new(
1✔
302
                        "id".to_string(),
1✔
303
                        FieldType::String,
1✔
304
                        false,
1✔
305
                        SourceDefinition::Dynamic,
1✔
306
                    ),
1✔
307
                    true,
1✔
308
                )
1✔
309
                .field(
1✔
310
                    FieldDefinition::new(
1✔
311
                        "value".to_string(),
1✔
312
                        FieldType::String,
1✔
313
                        false,
1✔
314
                        SourceDefinition::Dynamic,
1✔
315
                    ),
1✔
316
                    false,
1✔
317
                )
1✔
318
                .clone(),
1✔
319
            NoneContext {},
1✔
320
        ))
1✔
321
    }
1✔
322

×
323
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
324
        Ok(vec![OutputPortDef::new(
3✔
325
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
326
            OutputPortType::Stateless,
3✔
327
        )])
3✔
328
    }
3✔
329

×
330
    fn build(
1✔
331
        &self,
1✔
332
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
333
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
334
        Ok(Box::new(ErrGeneratorSource {
1✔
335
            count: self.count,
1✔
336
            err_at: self.err_at,
1✔
337
        }))
1✔
338
    }
1✔
339
}
340

×
341
#[derive(Debug)]
×
342
pub(crate) struct ErrGeneratorSource {
343
    count: u64,
344
    err_at: u64,
345
}
346

347
impl Source for ErrGeneratorSource {
×
348
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError> {
×
349
        Ok(false)
×
350
    }
×
351

×
352
    fn start(
1✔
353
        &self,
1✔
354
        fw: &mut dyn SourceChannelForwarder,
1✔
355
        _checkpoint: Option<(u64, u64)>,
1✔
356
    ) -> Result<(), ExecutionError> {
1✔
357
        for n in 1..(self.count + 1) {
200,000✔
358
            if n == self.err_at {
200,000✔
359
                return Err(ExecutionError::InvalidOperation(
1✔
360
                    "Generated Error".to_string(),
1✔
361
                ));
1✔
362
            }
199,999✔
363

199,999✔
364
            fw.send(
199,999✔
365
                IngestionMessage::new_op(
199,999✔
366
                    n,
199,999✔
367
                    0,
199,999✔
368
                    Operation::Insert {
199,999✔
369
                        new: Record::new(
199,999✔
370
                            None,
199,999✔
371
                            vec![
199,999✔
372
                                Field::String(format!("key_{n}")),
199,999✔
373
                                Field::String(format!("value_{n}")),
199,999✔
374
                            ],
199,999✔
375
                            None,
199,999✔
376
                        ),
199,999✔
377
                    },
199,999✔
378
                ),
199,999✔
379
                GENERATOR_SOURCE_OUTPUT_PORT,
199,999✔
380
            )?;
199,999✔
381
        }
382
        Ok(())
×
383
    }
1✔
384
}
×
385

×
386
#[test]
1✔
387
fn test_run_dag_src_err() {
1✔
388
    let count: u64 = 1_000_000;
1✔
389

1✔
390
    let mut dag = Dag::new();
1✔
391
    let latch = Arc::new(AtomicBool::new(true));
1✔
392

1✔
393
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
394
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
395
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
396

1✔
397
    dag.add_source(
1✔
398
        source_handle.clone(),
1✔
399
        Arc::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
400
    );
1✔
401
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
402
    dag.add_sink(
1✔
403
        sink_handle.clone(),
1✔
404
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
405
    );
1✔
406

1✔
407
    chk!(dag.connect(
1✔
408
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
409
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
410
    ));
1✔
411

1✔
412
    chk!(dag.connect(
1✔
413
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
414
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
415
    ));
×
416

1✔
417
    let tmp_dir = chk!(TempDir::new("test"));
1✔
418
    let _join_handle = DagExecutor::new(
1✔
419
        dag,
1✔
420
        tmp_dir.path().to_path_buf(),
1✔
421
        ExecutorOptions::default(),
1✔
422
    )
1✔
423
    .unwrap()
1✔
424
    .start(Arc::new(AtomicBool::new(true)))
1✔
425
    .unwrap();
1✔
426
    // join_handle.join().unwrap();
1✔
427
}
1✔
428

429
#[derive(Debug)]
×
430
pub(crate) struct ErrSinkFactory {
431
    err_at: u64,
432
    panic: bool,
433
}
×
434

×
435
impl ErrSinkFactory {
×
436
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
437
        Self { err_at, panic }
2✔
438
    }
2✔
439
}
×
440

×
441
impl SinkFactory<NoneContext> for ErrSinkFactory {
×
442
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
443
        vec![COUNTING_SINK_INPUT_PORT]
8✔
444
    }
8✔
445

×
446
    fn prepare(
2✔
447
        &self,
2✔
448
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
2✔
449
    ) -> Result<(), ExecutionError> {
2✔
450
        Ok(())
2✔
451
    }
2✔
452

×
453
    fn build(
2✔
454
        &self,
2✔
455
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
456
        _source_states: &SourceStates,
2✔
457
    ) -> Result<Box<dyn Sink>, ExecutionError> {
2✔
458
        Ok(Box::new(ErrSink {
2✔
459
            err_at: self.err_at,
2✔
460
            current: 0,
2✔
461
            panic: self.panic,
2✔
462
        }))
2✔
463
    }
2✔
464
}
465

466
#[derive(Debug)]
×
467
pub(crate) struct ErrSink {
468
    err_at: u64,
469
    current: u64,
470
    panic: bool,
×
471
}
×
472
impl Sink for ErrSink {
×
473
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
77✔
474
        Ok(())
77✔
475
    }
77✔
476

×
477
    fn process(
399,531✔
478
        &mut self,
399,531✔
479
        _from_port: PortHandle,
399,531✔
480
        _op: Operation,
399,531✔
481
        _state: &SharedTransaction,
399,531✔
482
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
399,531✔
483
    ) -> Result<(), ExecutionError> {
399,531✔
484
        self.current += 1;
399,531✔
485
        if self.current == self.err_at {
399,531✔
486
            if self.panic {
2✔
487
                panic!("Generated error");
1✔
488
            } else {
×
489
                return Err(ExecutionError::InvalidOperation(
1✔
490
                    "Generated error".to_string(),
1✔
491
                ));
1✔
492
            }
×
493
        }
399,529✔
494
        Ok(())
399,529✔
495
    }
399,530✔
496

497
    fn on_source_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
498
        Ok(())
×
499
    }
×
500
}
×
501

×
502
#[test]
1✔
503
#[should_panic]
×
504
fn test_run_dag_sink_err() {
1✔
505
    let count: u64 = 1_000_000;
1✔
506

1✔
507
    let mut dag = Dag::new();
1✔
508
    let latch = Arc::new(AtomicBool::new(true));
1✔
509

1✔
510
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
511
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
512
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
513

1✔
514
    dag.add_source(
1✔
515
        source_handle.clone(),
1✔
516
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
517
    );
1✔
518
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
519
    dag.add_sink(
1✔
520
        sink_handle.clone(),
1✔
521
        Arc::new(ErrSinkFactory::new(200_000, false)),
1✔
522
    );
1✔
523

1✔
524
    chk!(dag.connect(
1✔
525
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
526
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
527
    ));
1✔
528

1✔
529
    chk!(dag.connect(
1✔
530
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
531
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
532
    ));
1✔
533

1✔
534
    let tmp_dir = chk!(TempDir::new("test"));
1✔
535
    DagExecutor::new(
1✔
536
        dag,
1✔
537
        tmp_dir.path().to_path_buf(),
1✔
538
        ExecutorOptions::default(),
1✔
539
    )
1✔
540
    .unwrap()
1✔
541
    .start(Arc::new(AtomicBool::new(true)))
1✔
542
    .unwrap()
1✔
543
    .join()
1✔
544
    .unwrap();
1✔
545
}
1✔
546

×
547
#[test]
1✔
548
#[should_panic]
×
549
fn test_run_dag_sink_err_panic() {
1✔
550
    let count: u64 = 1_000_000;
1✔
551

1✔
552
    let mut dag = Dag::new();
1✔
553
    let latch = Arc::new(AtomicBool::new(true));
1✔
554

1✔
555
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
556
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
557
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
558

1✔
559
    dag.add_source(
1✔
560
        source_handle.clone(),
1✔
561
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
562
    );
1✔
563
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
564
    dag.add_sink(
1✔
565
        sink_handle.clone(),
1✔
566
        Arc::new(ErrSinkFactory::new(200_000, true)),
1✔
567
    );
1✔
568

1✔
569
    chk!(dag.connect(
1✔
570
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
571
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
572
    ));
1✔
573

1✔
574
    chk!(dag.connect(
1✔
575
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
576
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
577
    ));
1✔
578

1✔
579
    let tmp_dir = chk!(TempDir::new("test"));
1✔
580
    DagExecutor::new(
1✔
581
        dag,
1✔
582
        tmp_dir.path().to_path_buf(),
1✔
583
        ExecutorOptions::default(),
1✔
584
    )
1✔
585
    .unwrap()
1✔
586
    .start(Arc::new(AtomicBool::new(true)))
1✔
587
    .unwrap()
1✔
588
    .join()
1✔
589
    .unwrap();
1✔
590
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc