• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.96
/dozer-core/src/dag/tests/dag_base_errors.rs
1
use crate::chk;
2
use crate::dag::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
3
use crate::dag::errors::ExecutionError;
4
use crate::dag::executor::{DagExecutor, ExecutorOptions};
5
use crate::dag::node::{
6
    NodeHandle, OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink,
7
    SinkFactory, Source, SourceFactory,
8
};
9
use crate::dag::record_store::RecordReader;
10
use crate::dag::tests::dag_base_run::NoopProcessorFactory;
11
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
12
use crate::dag::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
13
use crate::dag::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
14
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use std::collections::HashMap;
20
use std::panic;
21

22
use std::sync::atomic::AtomicBool;
23
use std::sync::Arc;
24

25
use crate::dag::epoch::Epoch;
26

27
use crate::dag::tests::app::NoneContext;
28
use tempdir::TempDir;
29

30
// Test when error is generated by a processor
31

32
#[derive(Debug)]
×
33
struct ErrorProcessorFactory {
34
    err_on: u64,
35
    panic: bool,
36
}
37

38
impl ProcessorFactory<NoneContext> for ErrorProcessorFactory {
39
    fn get_output_schema(
3✔
40
        &self,
3✔
41
        _output_port: &PortHandle,
3✔
42
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
3✔
43
    ) -> Result<(Schema, NoneContext), ExecutionError> {
3✔
44
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
45
    }
3✔
46

47
    fn get_input_ports(&self) -> Vec<PortHandle> {
6✔
48
        vec![DEFAULT_PORT_HANDLE]
6✔
49
    }
6✔
50

51
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
52
        vec![OutputPortDef::new(
9✔
53
            DEFAULT_PORT_HANDLE,
9✔
54
            OutputPortType::Stateless,
9✔
55
        )]
9✔
56
    }
9✔
57

58
    fn prepare(
×
59
        &self,
×
60
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
61
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
62
    ) -> Result<(), ExecutionError> {
×
63
        Ok(())
×
64
    }
×
65

66
    fn build(
3✔
67
        &self,
3✔
68
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
69
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
70
    ) -> Result<Box<dyn Processor>, ExecutionError> {
3✔
71
        Ok(Box::new(ErrorProcessor {
3✔
72
            err_on: self.err_on,
3✔
73
            count: 0,
3✔
74
            panic: self.panic,
3✔
75
        }))
3✔
76
    }
3✔
77
}
78

79
#[derive(Debug)]
×
80
struct ErrorProcessor {
81
    err_on: u64,
82
    count: u64,
83
    panic: bool,
84
}
85

86
impl Processor for ErrorProcessor {
87
    fn init(&mut self, _state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
3✔
88
        Ok(())
3✔
89
    }
3✔
90

91
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
609✔
92
        Ok(())
609✔
93
    }
609✔
94

95
    fn process(
2,399,857✔
96
        &mut self,
2,399,857✔
97
        _from_port: PortHandle,
2,399,857✔
98
        op: Operation,
2,399,857✔
99
        fw: &mut dyn ProcessorChannelForwarder,
2,399,857✔
100
        _tx: &SharedTransaction,
2,399,857✔
101
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
2,399,857✔
102
    ) -> Result<(), ExecutionError> {
2,399,857✔
103
        self.count += 1;
2,399,857✔
104
        if self.count == self.err_on {
2,399,857✔
105
            if self.panic {
3✔
106
                panic!("Generated error");
1✔
107
            } else {
108
                return Err(ExecutionError::InvalidOperation("Uknown".to_string()));
2✔
109
            }
110
        }
2,399,854✔
111

2,399,854✔
112
        fw.send(op, DEFAULT_PORT_HANDLE)
2,399,854✔
113
    }
2,399,856✔
114
}
115

116
#[test]
1✔
117
#[should_panic]
118
fn test_run_dag_proc_err_panic() {
1✔
119
    let count: u64 = 1_000_000;
1✔
120

1✔
121
    let mut dag = Dag::new();
1✔
122
    let latch = Arc::new(AtomicBool::new(true));
1✔
123

1✔
124
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
125
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
126
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
127

1✔
128
    dag.add_source(
1✔
129
        source_handle.clone(),
1✔
130
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
131
    );
1✔
132
    dag.add_processor(
1✔
133
        proc_handle.clone(),
1✔
134
        Arc::new(ErrorProcessorFactory {
1✔
135
            err_on: 800_000,
1✔
136
            panic: true,
1✔
137
        }),
1✔
138
    );
1✔
139
    dag.add_sink(
1✔
140
        sink_handle.clone(),
1✔
141
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
142
    );
1✔
143

×
144
    chk!(dag.connect(
×
145
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
146
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
147
    ));
×
148

×
149
    chk!(dag.connect(
×
150
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
151
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
152
    ));
×
153

×
154
    let tmp_dir = chk!(TempDir::new("test"));
1✔
155
    let mut executor = chk!(DagExecutor::new(
1✔
156
        dag,
×
157
        tmp_dir.path(),
×
158
        ExecutorOptions::default(),
×
159
        Arc::new(AtomicBool::new(true))
×
160
    ));
×
161

×
162
    chk!(executor.start());
×
163
    assert!(executor.join().is_err());
1✔
164
}
1✔
165

166
#[test]
1✔
167
#[should_panic]
×
168
fn test_run_dag_proc_err_2() {
1✔
169
    let count: u64 = 1_000_000;
1✔
170

1✔
171
    let mut dag = Dag::new();
1✔
172
    let latch = Arc::new(AtomicBool::new(true));
1✔
173

1✔
174
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
175
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
176
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
177
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
178

1✔
179
    dag.add_source(
1✔
180
        source_handle.clone(),
1✔
181
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
182
    );
1✔
183
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
184

1✔
185
    dag.add_processor(
1✔
186
        proc_err_handle.clone(),
1✔
187
        Arc::new(ErrorProcessorFactory {
1✔
188
            err_on: 800_000,
1✔
189
            panic: false,
1✔
190
        }),
1✔
191
    );
1✔
192

1✔
193
    dag.add_sink(
1✔
194
        sink_handle.clone(),
1✔
195
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
196
    );
1✔
197

×
198
    chk!(dag.connect(
×
199
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
200
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
201
    ));
×
202

×
203
    chk!(dag.connect(
×
204
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
205
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
206
    ));
×
207

×
208
    chk!(dag.connect(
×
209
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
210
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
211
    ));
×
212

×
213
    let tmp_dir = chk!(TempDir::new("test"));
1✔
214
    let mut executor = chk!(DagExecutor::new(
1✔
215
        dag,
×
216
        tmp_dir.path(),
×
217
        ExecutorOptions::default(),
×
218
        Arc::new(AtomicBool::new(true))
×
219
    ));
×
220

×
221
    chk!(executor.start());
×
222
    assert!(executor.join().is_err());
1✔
223
}
1✔
224

×
225
#[test]
1✔
226
#[should_panic]
×
227
fn test_run_dag_proc_err_3() {
1✔
228
    let count: u64 = 1_000_000;
1✔
229

1✔
230
    let mut dag = Dag::new();
1✔
231
    let latch = Arc::new(AtomicBool::new(true));
1✔
232

1✔
233
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
234
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
235
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
236
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
237

1✔
238
    dag.add_source(
1✔
239
        source_handle.clone(),
1✔
240
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
241
    );
1✔
242

1✔
243
    dag.add_processor(
1✔
244
        proc_err_handle.clone(),
1✔
245
        Arc::new(ErrorProcessorFactory {
1✔
246
            err_on: 800_000,
1✔
247
            panic: false,
1✔
248
        }),
1✔
249
    );
1✔
250

1✔
251
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
252

1✔
253
    dag.add_sink(
1✔
254
        sink_handle.clone(),
1✔
255
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
256
    );
1✔
257

×
258
    chk!(dag.connect(
×
259
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
260
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
261
    ));
×
262

×
263
    chk!(dag.connect(
×
264
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
265
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
266
    ));
×
267

×
268
    chk!(dag.connect(
×
269
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
270
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
271
    ));
×
272

×
273
    let tmp_dir = chk!(TempDir::new("test"));
1✔
274
    let mut executor = chk!(DagExecutor::new(
1✔
275
        dag,
×
276
        tmp_dir.path(),
×
277
        ExecutorOptions::default(),
×
278
        Arc::new(AtomicBool::new(true))
×
279
    ));
×
280

281
    chk!(executor.start());
×
282
    assert!(executor.join().is_err());
1✔
283
}
1✔
284

×
285
// Test when error is generated by a source
286

×
287
#[derive(Debug)]
×
288
pub(crate) struct ErrGeneratorSourceFactory {
×
289
    count: u64,
×
290
    err_at: u64,
291
}
×
292

×
293
impl ErrGeneratorSourceFactory {
×
294
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
295
        Self { count, err_at }
1✔
296
    }
1✔
297
}
×
298

299
impl SourceFactory<NoneContext> for ErrGeneratorSourceFactory {
×
300
    fn get_output_schema(
1✔
301
        &self,
1✔
302
        _port: &PortHandle,
1✔
303
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
304
        Ok((
1✔
305
            Schema::empty()
1✔
306
                .field(
1✔
307
                    FieldDefinition::new(
1✔
308
                        "id".to_string(),
1✔
309
                        FieldType::String,
1✔
310
                        false,
1✔
311
                        SourceDefinition::Dynamic,
1✔
312
                    ),
1✔
313
                    true,
1✔
314
                )
1✔
315
                .field(
1✔
316
                    FieldDefinition::new(
1✔
317
                        "value".to_string(),
1✔
318
                        FieldType::String,
1✔
319
                        false,
1✔
320
                        SourceDefinition::Dynamic,
1✔
321
                    ),
1✔
322
                    false,
1✔
323
                )
1✔
324
                .clone(),
1✔
325
            NoneContext {},
1✔
326
        ))
1✔
327
    }
1✔
328

×
329
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
330
        Ok(vec![OutputPortDef::new(
3✔
331
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
332
            OutputPortType::Stateless,
3✔
333
        )])
3✔
334
    }
3✔
335

×
336
    fn prepare(
×
337
        &self,
×
338
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
339
    ) -> Result<(), ExecutionError> {
×
340
        Ok(())
×
341
    }
×
342

×
343
    fn build(
1✔
344
        &self,
1✔
345
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
346
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
347
        Ok(Box::new(ErrGeneratorSource {
1✔
348
            count: self.count,
1✔
349
            err_at: self.err_at,
1✔
350
        }))
1✔
351
    }
1✔
352
}
×
353

354
#[derive(Debug)]
×
355
pub(crate) struct ErrGeneratorSource {
×
356
    count: u64,
×
357
    err_at: u64,
×
358
}
×
359

×
360
impl Source for ErrGeneratorSource {
361
    fn start(
1✔
362
        &self,
1✔
363
        fw: &mut dyn SourceChannelForwarder,
1✔
364
        _from_seq: Option<(u64, u64)>,
1✔
365
    ) -> Result<(), ExecutionError> {
1✔
366
        for n in 1..(self.count + 1) {
1✔
367
            if n == self.err_at {
1✔
368
                return Err(ExecutionError::InvalidOperation(
×
369
                    "Generated Error".to_string(),
×
370
                ));
×
371
            }
1✔
372

1✔
373
            fw.send(
1✔
374
                n,
1✔
375
                0,
1✔
376
                Operation::Insert {
1✔
377
                    new: Record::new(
1✔
378
                        None,
1✔
379
                        vec![
1✔
380
                            Field::String(format!("key_{n}")),
1✔
381
                            Field::String(format!("value_{n}")),
1✔
382
                        ],
1✔
383
                        None,
1✔
384
                    ),
1✔
385
                },
1✔
386
                GENERATOR_SOURCE_OUTPUT_PORT,
1✔
387
            )?;
1✔
388
        }
×
389
        Ok(())
×
390
    }
1✔
391
}
×
392

×
393
#[test]
1✔
394
fn test_run_dag_src_err() {
1✔
395
    let count: u64 = 1_000_000;
1✔
396

1✔
397
    let mut dag = Dag::new();
1✔
398
    let latch = Arc::new(AtomicBool::new(true));
1✔
399

1✔
400
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
401
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
402
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
403

1✔
404
    dag.add_source(
1✔
405
        source_handle.clone(),
1✔
406
        Arc::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
407
    );
1✔
408
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
409
    dag.add_sink(
1✔
410
        sink_handle.clone(),
1✔
411
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
412
    );
1✔
413

1✔
414
    chk!(dag.connect(
1✔
415
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
416
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
417
    ));
1✔
418

1✔
419
    chk!(dag.connect(
1✔
420
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
421
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
422
    ));
×
423

1✔
424
    let tmp_dir = chk!(TempDir::new("test"));
1✔
425
    let mut executor = chk!(DagExecutor::new(
1✔
426
        dag,
×
427
        tmp_dir.path(),
×
428
        ExecutorOptions::default(),
×
429
        Arc::new(AtomicBool::new(true))
×
430
    ));
1✔
431

1✔
432
    executor.start().unwrap();
1✔
433
    //  assert!(executor.join().is_ok());
1✔
434
}
1✔
435

×
436
#[derive(Debug)]
×
437
pub(crate) struct ErrSinkFactory {
×
438
    err_at: u64,
×
439
    panic: bool,
×
440
}
×
441

×
442
impl ErrSinkFactory {
×
443
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
444
        Self { err_at, panic }
2✔
445
    }
2✔
446
}
×
447

×
448
impl SinkFactory<NoneContext> for ErrSinkFactory {
×
449
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
450
        vec![COUNTING_SINK_INPUT_PORT]
4✔
451
    }
4✔
452

×
453
    fn prepare(
×
454
        &self,
×
455
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
456
    ) -> Result<(), ExecutionError> {
×
457
        Ok(())
×
458
    }
×
459

460
    fn build(
2✔
461
        &self,
2✔
462
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
463
    ) -> Result<Box<dyn Sink>, ExecutionError> {
2✔
464
        Ok(Box::new(ErrSink {
2✔
465
            err_at: self.err_at,
2✔
466
            current: 0,
2✔
467
            panic: self.panic,
2✔
468
        }))
2✔
469
    }
2✔
470
}
×
471

×
472
#[derive(Debug)]
×
473
pub(crate) struct ErrSink {
×
474
    err_at: u64,
×
475
    current: u64,
×
476
    panic: bool,
×
477
}
×
478
impl Sink for ErrSink {
×
479
    fn init(&mut self, _state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
2✔
480
        Ok(())
2✔
481
    }
2✔
482

×
483
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
89✔
484
        Ok(())
89✔
485
    }
89✔
486

×
487
    fn process(
400,000✔
488
        &mut self,
400,000✔
489
        _from_port: PortHandle,
400,000✔
490
        _op: Operation,
400,000✔
491
        _state: &SharedTransaction,
400,000✔
492
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
400,000✔
493
    ) -> Result<(), ExecutionError> {
400,000✔
494
        self.current += 1;
400,000✔
495
        if self.current == self.err_at {
400,000✔
496
            if self.panic {
2✔
497
                panic!("Generated error");
1✔
498
            } else {
499
                return Err(ExecutionError::InvalidOperation(
1✔
500
                    "Generated error".to_string(),
1✔
501
                ));
1✔
502
            }
×
503
        }
399,998✔
504
        Ok(())
399,998✔
505
    }
399,999✔
506
}
×
507

×
508
#[test]
1✔
509
#[should_panic]
×
510
fn test_run_dag_sink_err() {
1✔
511
    let count: u64 = 1_000_000;
1✔
512

1✔
513
    let mut dag = Dag::new();
1✔
514
    let latch = Arc::new(AtomicBool::new(true));
1✔
515

1✔
516
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
517
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
518
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
519

1✔
520
    dag.add_source(
1✔
521
        source_handle.clone(),
1✔
522
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
523
    );
1✔
524
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
525
    dag.add_sink(
1✔
526
        sink_handle.clone(),
1✔
527
        Arc::new(ErrSinkFactory::new(200_000, false)),
1✔
528
    );
1✔
529

×
530
    chk!(dag.connect(
×
531
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
532
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
533
    ));
×
534

×
535
    chk!(dag.connect(
×
536
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
537
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
538
    ));
×
539

1✔
540
    let tmp_dir = chk!(TempDir::new("test"));
1✔
541
    let mut executor = chk!(DagExecutor::new(
1✔
542
        dag,
×
543
        tmp_dir.path(),
×
544
        ExecutorOptions::default(),
×
545
        Arc::new(AtomicBool::new(true))
×
546
    ));
×
547

×
548
    chk!(executor.start());
×
549
    assert!(executor.join().is_err());
1✔
550
}
1✔
551

×
552
#[test]
1✔
553
#[should_panic]
×
554
fn test_run_dag_sink_err_panic() {
1✔
555
    let count: u64 = 1_000_000;
1✔
556

1✔
557
    let mut dag = Dag::new();
1✔
558
    let latch = Arc::new(AtomicBool::new(true));
1✔
559

1✔
560
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
561
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
562
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
563

1✔
564
    dag.add_source(
1✔
565
        source_handle.clone(),
1✔
566
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
567
    );
1✔
568
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
569
    dag.add_sink(
1✔
570
        sink_handle.clone(),
1✔
571
        Arc::new(ErrSinkFactory::new(200_000, true)),
1✔
572
    );
1✔
573

×
574
    chk!(dag.connect(
×
575
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
576
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
577
    ));
×
578

×
579
    chk!(dag.connect(
×
580
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
581
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
582
    ));
×
583

1✔
584
    let tmp_dir = chk!(TempDir::new("test"));
1✔
585
    let mut executor = chk!(DagExecutor::new(
1✔
586
        dag,
×
587
        tmp_dir.path(),
×
588
        ExecutorOptions::default(),
×
589
        Arc::new(AtomicBool::new(true))
×
590
    ));
×
591

×
592
    chk!(executor.start());
×
593
    assert!(executor.join().is_err());
1✔
594
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc