• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4360628700

pending completion
4360628700

push

github

GitHub
refactor: pipeline memory storage (#1135)

1152 of 1152 new or added lines in 40 files covered. (100.0%)

27472 of 39301 relevant lines covered (69.9%)

28369.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.43
/dozer-core/src/tests/dag_base_errors.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::chk;
3
use crate::errors::ExecutionError;
4
use crate::executor::{DagExecutor, ExecutorOptions};
5
use crate::node::{
6
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
7
    Source, SourceFactory,
8
};
9
use crate::tests::dag_base_run::NoopProcessorFactory;
10
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
11
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
12
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
13
use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
14
use dozer_types::ingestion_types::IngestionMessage;
15
use dozer_types::node::{NodeHandle, SourceStates};
16
use dozer_types::types::{
17
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
18
};
19

20
use std::collections::HashMap;
21
use std::panic;
22

23
use std::sync::atomic::AtomicBool;
24
use std::sync::Arc;
25

26
use crate::epoch::Epoch;
27

28
use crate::tests::app::NoneContext;
29
use tempdir::TempDir;
30

31
// Test when error is generated by a processor
32

33
#[derive(Debug)]
×
34
struct ErrorProcessorFactory {
×
35
    err_on: u64,
36
    panic: bool,
37
}
38

39
impl ProcessorFactory<NoneContext> for ErrorProcessorFactory {
40
    fn get_output_schema(
3✔
41
        &self,
3✔
42
        _output_port: &PortHandle,
3✔
43
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
3✔
44
    ) -> Result<(Schema, NoneContext), ExecutionError> {
3✔
45
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
46
    }
3✔
47

×
48
    fn get_input_ports(&self) -> Vec<PortHandle> {
12✔
49
        vec![DEFAULT_PORT_HANDLE]
12✔
50
    }
12✔
51

×
52
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
53
        vec![OutputPortDef::new(
9✔
54
            DEFAULT_PORT_HANDLE,
9✔
55
            OutputPortType::Stateless,
9✔
56
        )]
9✔
57
    }
9✔
58

×
59
    fn build(
3✔
60
        &self,
3✔
61
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
62
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
63
        _txn: &mut LmdbExclusiveTransaction,
3✔
64
    ) -> Result<Box<dyn Processor>, ExecutionError> {
3✔
65
        Ok(Box::new(ErrorProcessor {
3✔
66
            err_on: self.err_on,
3✔
67
            count: 0,
3✔
68
            panic: self.panic,
3✔
69
        }))
3✔
70
    }
3✔
71
}
×
72

73
#[derive(Debug)]
×
74
struct ErrorProcessor {
×
75
    err_on: u64,
76
    count: u64,
77
    panic: bool,
78
}
79

80
impl Processor for ErrorProcessor {
81
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
351✔
82
        Ok(())
351✔
83
    }
351✔
84

×
85
    fn process(
2,372,614✔
86
        &mut self,
2,372,614✔
87
        _from_port: PortHandle,
2,372,614✔
88
        op: Operation,
2,372,614✔
89
        fw: &mut dyn ProcessorChannelForwarder,
2,372,614✔
90
        _tx: &SharedTransaction,
2,372,614✔
91
    ) -> Result<(), ExecutionError> {
2,372,614✔
92
        self.count += 1;
2,372,614✔
93
        if self.count == self.err_on {
2,372,614✔
94
            if self.panic {
3✔
95
                panic!("Generated error");
1✔
96
            } else {
×
97
                return Err(ExecutionError::InvalidOperation("Uknown".to_string()));
2✔
98
            }
99
        }
2,372,611✔
100

2,372,611✔
101
        fw.send(op, DEFAULT_PORT_HANDLE)
2,372,611✔
102
    }
2,372,613✔
103
}
×
104

×
105
#[test]
1✔
106
#[should_panic]
107
fn test_run_dag_proc_err_panic() {
1✔
108
    let count: u64 = 1_000_000;
1✔
109

1✔
110
    let mut dag = Dag::new();
1✔
111
    let latch = Arc::new(AtomicBool::new(true));
1✔
112

1✔
113
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
114
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
115
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
116

1✔
117
    dag.add_source(
1✔
118
        source_handle.clone(),
1✔
119
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
120
    );
1✔
121
    dag.add_processor(
1✔
122
        proc_handle.clone(),
1✔
123
        Arc::new(ErrorProcessorFactory {
1✔
124
            err_on: 800_000,
1✔
125
            panic: true,
1✔
126
        }),
1✔
127
    );
1✔
128
    dag.add_sink(
1✔
129
        sink_handle.clone(),
1✔
130
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
131
    );
1✔
132

1✔
133
    chk!(dag.connect(
1✔
134
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
135
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
136
    ));
1✔
137

1✔
138
    chk!(dag.connect(
1✔
139
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
140
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
141
    ));
×
142

1✔
143
    let tmp_dir = chk!(TempDir::new("test"));
1✔
144
    DagExecutor::new(
1✔
145
        dag,
1✔
146
        tmp_dir.path().to_path_buf(),
1✔
147
        ExecutorOptions::default(),
1✔
148
    )
1✔
149
    .unwrap()
1✔
150
    .start(Arc::new(AtomicBool::new(true)))
1✔
151
    .unwrap()
1✔
152
    .join()
1✔
153
    .unwrap();
1✔
154
}
1✔
155

×
156
#[test]
1✔
157
#[should_panic]
158
fn test_run_dag_proc_err_2() {
1✔
159
    let count: u64 = 1_000_000;
1✔
160

1✔
161
    let mut dag = Dag::new();
1✔
162
    let latch = Arc::new(AtomicBool::new(true));
1✔
163

1✔
164
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
165
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
166
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
167
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
168

1✔
169
    dag.add_source(
1✔
170
        source_handle.clone(),
1✔
171
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
172
    );
1✔
173
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
174

1✔
175
    dag.add_processor(
1✔
176
        proc_err_handle.clone(),
1✔
177
        Arc::new(ErrorProcessorFactory {
1✔
178
            err_on: 800_000,
1✔
179
            panic: false,
1✔
180
        }),
1✔
181
    );
1✔
182

1✔
183
    dag.add_sink(
1✔
184
        sink_handle.clone(),
1✔
185
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
186
    );
1✔
187

1✔
188
    chk!(dag.connect(
1✔
189
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
190
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
191
    ));
1✔
192

1✔
193
    chk!(dag.connect(
1✔
194
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
195
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
196
    ));
1✔
197

1✔
198
    chk!(dag.connect(
1✔
199
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
200
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
201
    ));
×
202

1✔
203
    let tmp_dir = chk!(TempDir::new("test"));
1✔
204
    DagExecutor::new(
1✔
205
        dag,
1✔
206
        tmp_dir.path().to_path_buf(),
1✔
207
        ExecutorOptions::default(),
1✔
208
    )
1✔
209
    .unwrap()
1✔
210
    .start(Arc::new(AtomicBool::new(true)))
1✔
211
    .unwrap()
1✔
212
    .join()
1✔
213
    .unwrap();
1✔
214
}
1✔
215

×
216
#[test]
1✔
217
#[should_panic]
218
fn test_run_dag_proc_err_3() {
1✔
219
    let count: u64 = 1_000_000;
1✔
220

1✔
221
    let mut dag = Dag::new();
1✔
222
    let latch = Arc::new(AtomicBool::new(true));
1✔
223

1✔
224
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
225
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
226
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
227
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
228

1✔
229
    dag.add_source(
1✔
230
        source_handle.clone(),
1✔
231
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
232
    );
1✔
233

1✔
234
    dag.add_processor(
1✔
235
        proc_err_handle.clone(),
1✔
236
        Arc::new(ErrorProcessorFactory {
1✔
237
            err_on: 800_000,
1✔
238
            panic: false,
1✔
239
        }),
1✔
240
    );
1✔
241

1✔
242
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
243

1✔
244
    dag.add_sink(
1✔
245
        sink_handle.clone(),
1✔
246
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
247
    );
1✔
248

1✔
249
    chk!(dag.connect(
1✔
250
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
251
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
252
    ));
1✔
253

1✔
254
    chk!(dag.connect(
1✔
255
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
256
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
257
    ));
1✔
258

1✔
259
    chk!(dag.connect(
1✔
260
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
261
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
262
    ));
×
263

1✔
264
    let tmp_dir = chk!(TempDir::new("test"));
1✔
265
    DagExecutor::new(
1✔
266
        dag,
1✔
267
        tmp_dir.path().to_path_buf(),
1✔
268
        ExecutorOptions::default(),
1✔
269
    )
1✔
270
    .unwrap()
1✔
271
    .start(Arc::new(AtomicBool::new(true)))
1✔
272
    .unwrap()
1✔
273
    .join()
1✔
274
    .unwrap();
1✔
275
}
1✔
276

×
277
// Test when error is generated by a source
×
278

279
#[derive(Debug)]
×
280
pub(crate) struct ErrGeneratorSourceFactory {
281
    count: u64,
×
282
    err_at: u64,
283
}
284

285
impl ErrGeneratorSourceFactory {
286
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
287
        Self { count, err_at }
1✔
288
    }
1✔
289
}
×
290

×
291
impl SourceFactory<NoneContext> for ErrGeneratorSourceFactory {
292
    fn get_output_schema(
1✔
293
        &self,
1✔
294
        _port: &PortHandle,
1✔
295
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
296
        Ok((
1✔
297
            Schema::empty()
1✔
298
                .field(
1✔
299
                    FieldDefinition::new(
1✔
300
                        "id".to_string(),
1✔
301
                        FieldType::String,
1✔
302
                        false,
1✔
303
                        SourceDefinition::Dynamic,
1✔
304
                    ),
1✔
305
                    true,
1✔
306
                )
1✔
307
                .field(
1✔
308
                    FieldDefinition::new(
1✔
309
                        "value".to_string(),
1✔
310
                        FieldType::String,
1✔
311
                        false,
1✔
312
                        SourceDefinition::Dynamic,
1✔
313
                    ),
1✔
314
                    false,
1✔
315
                )
1✔
316
                .clone(),
1✔
317
            NoneContext {},
1✔
318
        ))
1✔
319
    }
1✔
320

×
321
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
322
        vec![OutputPortDef::new(
3✔
323
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
324
            OutputPortType::Stateless,
3✔
325
        )]
3✔
326
    }
3✔
327

×
328
    fn build(
1✔
329
        &self,
1✔
330
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
331
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
332
        Ok(Box::new(ErrGeneratorSource {
1✔
333
            count: self.count,
1✔
334
            err_at: self.err_at,
1✔
335
        }))
1✔
336
    }
1✔
337
}
×
338

×
339
#[derive(Debug)]
×
340
pub(crate) struct ErrGeneratorSource {
341
    count: u64,
×
342
    err_at: u64,
343
}
344

345
impl Source for ErrGeneratorSource {
346
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError> {
×
347
        Ok(false)
×
348
    }
×
349

×
350
    fn start(
1✔
351
        &self,
1✔
352
        fw: &mut dyn SourceChannelForwarder,
1✔
353
        _checkpoint: Option<(u64, u64)>,
1✔
354
    ) -> Result<(), ExecutionError> {
1✔
355
        for n in 1..(self.count + 1) {
200,000✔
356
            if n == self.err_at {
200,000✔
357
                return Err(ExecutionError::InvalidOperation(
1✔
358
                    "Generated Error".to_string(),
1✔
359
                ));
1✔
360
            }
199,999✔
361

199,999✔
362
            fw.send(
199,999✔
363
                IngestionMessage::new_op(
199,999✔
364
                    n,
199,999✔
365
                    0,
199,999✔
366
                    Operation::Insert {
199,999✔
367
                        new: Record::new(
199,999✔
368
                            None,
199,999✔
369
                            vec![
199,999✔
370
                                Field::String(format!("key_{n}")),
199,999✔
371
                                Field::String(format!("value_{n}")),
199,999✔
372
                            ],
199,999✔
373
                            None,
199,999✔
374
                        ),
199,999✔
375
                    },
199,999✔
376
                ),
199,999✔
377
                GENERATOR_SOURCE_OUTPUT_PORT,
199,999✔
378
            )?;
199,999✔
379
        }
×
380
        Ok(())
×
381
    }
1✔
382
}
×
383

×
384
#[test]
1✔
385
fn test_run_dag_src_err() {
1✔
386
    let count: u64 = 1_000_000;
1✔
387

1✔
388
    let mut dag = Dag::new();
1✔
389
    let latch = Arc::new(AtomicBool::new(true));
1✔
390

1✔
391
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
392
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
393
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
394

1✔
395
    dag.add_source(
1✔
396
        source_handle.clone(),
1✔
397
        Arc::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
398
    );
1✔
399
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
400
    dag.add_sink(
1✔
401
        sink_handle.clone(),
1✔
402
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
403
    );
1✔
404

1✔
405
    chk!(dag.connect(
1✔
406
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
407
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
408
    ));
1✔
409

1✔
410
    chk!(dag.connect(
1✔
411
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
412
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
413
    ));
×
414

1✔
415
    let tmp_dir = chk!(TempDir::new("test"));
1✔
416
    let _join_handle = DagExecutor::new(
1✔
417
        dag,
1✔
418
        tmp_dir.path().to_path_buf(),
1✔
419
        ExecutorOptions::default(),
1✔
420
    )
1✔
421
    .unwrap()
1✔
422
    .start(Arc::new(AtomicBool::new(true)))
1✔
423
    .unwrap();
1✔
424
    // join_handle.join().unwrap();
1✔
425
}
1✔
426

×
427
#[derive(Debug)]
×
428
pub(crate) struct ErrSinkFactory {
429
    err_at: u64,
×
430
    panic: bool,
431
}
432

433
impl ErrSinkFactory {
434
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
435
        Self { err_at, panic }
2✔
436
    }
2✔
437
}
×
438

×
439
impl SinkFactory<NoneContext> for ErrSinkFactory {
440
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
441
        vec![COUNTING_SINK_INPUT_PORT]
8✔
442
    }
8✔
443

×
444
    fn prepare(
2✔
445
        &self,
2✔
446
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
2✔
447
    ) -> Result<(), ExecutionError> {
2✔
448
        Ok(())
2✔
449
    }
2✔
450

×
451
    fn build(
2✔
452
        &self,
2✔
453
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
454
        _source_states: &SourceStates,
2✔
455
    ) -> Result<Box<dyn Sink>, ExecutionError> {
2✔
456
        Ok(Box::new(ErrSink {
2✔
457
            err_at: self.err_at,
2✔
458
            current: 0,
2✔
459
            panic: self.panic,
2✔
460
        }))
2✔
461
    }
2✔
462
}
×
463

×
464
#[derive(Debug)]
×
465
pub(crate) struct ErrSink {
466
    err_at: u64,
×
467
    current: u64,
468
    panic: bool,
469
}
470
impl Sink for ErrSink {
471
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
65✔
472
        Ok(())
65✔
473
    }
65✔
474

×
475
    fn process(
399,269✔
476
        &mut self,
399,269✔
477
        _from_port: PortHandle,
399,269✔
478
        _op: Operation,
399,269✔
479
        _state: &SharedTransaction,
399,269✔
480
    ) -> Result<(), ExecutionError> {
399,269✔
481
        self.current += 1;
399,269✔
482
        if self.current == self.err_at {
399,269✔
483
            if self.panic {
2✔
484
                panic!("Generated error");
1✔
485
            } else {
×
486
                return Err(ExecutionError::InvalidOperation(
1✔
487
                    "Generated error".to_string(),
1✔
488
                ));
1✔
489
            }
×
490
        }
399,267✔
491
        Ok(())
399,267✔
492
    }
399,268✔
493

×
494
    fn on_source_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
495
        Ok(())
×
496
    }
×
497
}
×
498

×
499
#[test]
1✔
500
#[should_panic]
501
fn test_run_dag_sink_err() {
1✔
502
    let count: u64 = 1_000_000;
1✔
503

1✔
504
    let mut dag = Dag::new();
1✔
505
    let latch = Arc::new(AtomicBool::new(true));
1✔
506

1✔
507
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
508
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
509
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
510

1✔
511
    dag.add_source(
1✔
512
        source_handle.clone(),
1✔
513
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
514
    );
1✔
515
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
516
    dag.add_sink(
1✔
517
        sink_handle.clone(),
1✔
518
        Arc::new(ErrSinkFactory::new(200_000, false)),
1✔
519
    );
1✔
520

1✔
521
    chk!(dag.connect(
1✔
522
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
523
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
524
    ));
1✔
525

1✔
526
    chk!(dag.connect(
1✔
527
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
528
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
529
    ));
1✔
530

1✔
531
    let tmp_dir = chk!(TempDir::new("test"));
1✔
532
    DagExecutor::new(
1✔
533
        dag,
1✔
534
        tmp_dir.path().to_path_buf(),
1✔
535
        ExecutorOptions::default(),
1✔
536
    )
1✔
537
    .unwrap()
1✔
538
    .start(Arc::new(AtomicBool::new(true)))
1✔
539
    .unwrap()
1✔
540
    .join()
1✔
541
    .unwrap();
1✔
542
}
1✔
543

×
544
#[test]
1✔
545
#[should_panic]
×
546
fn test_run_dag_sink_err_panic() {
1✔
547
    let count: u64 = 1_000_000;
1✔
548

1✔
549
    let mut dag = Dag::new();
1✔
550
    let latch = Arc::new(AtomicBool::new(true));
1✔
551

1✔
552
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
553
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
554
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
555

1✔
556
    dag.add_source(
1✔
557
        source_handle.clone(),
1✔
558
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
559
    );
1✔
560
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
561
    dag.add_sink(
1✔
562
        sink_handle.clone(),
1✔
563
        Arc::new(ErrSinkFactory::new(200_000, true)),
1✔
564
    );
1✔
565

1✔
566
    chk!(dag.connect(
1✔
567
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
568
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
569
    ));
1✔
570

1✔
571
    chk!(dag.connect(
1✔
572
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
573
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
574
    ));
1✔
575

1✔
576
    let tmp_dir = chk!(TempDir::new("test"));
1✔
577
    DagExecutor::new(
1✔
578
        dag,
1✔
579
        tmp_dir.path().to_path_buf(),
1✔
580
        ExecutorOptions::default(),
1✔
581
    )
1✔
582
    .unwrap()
1✔
583
    .start(Arc::new(AtomicBool::new(true)))
1✔
584
    .unwrap()
1✔
585
    .join()
1✔
586
    .unwrap();
1✔
587
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc