• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283045331

pending completion
4283045331

push

github

GitHub
feat: Support timestamp diff (#1074)

58 of 58 new or added lines in 2 files covered. (100.0%)

27146 of 37535 relevant lines covered (72.32%)

33460.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/dozer-core/src/tests/dag_base_errors.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::chk;
3
use crate::errors::ExecutionError;
4
use crate::executor::{DagExecutor, ExecutorOptions};
5
use crate::node::{
6
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
7
    Source, SourceFactory,
8
};
9
use crate::record_store::RecordReader;
10
use crate::tests::dag_base_run::NoopProcessorFactory;
11
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
12
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
13
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
14
use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
15
use dozer_types::node::NodeHandle;
16
use dozer_types::types::{
17
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
18
};
19

20
use std::collections::HashMap;
21
use std::panic;
22

23
use std::sync::atomic::AtomicBool;
24
use std::sync::Arc;
25

26
use crate::epoch::Epoch;
27

28
use crate::tests::app::NoneContext;
29
use tempdir::TempDir;
30

31
// Test when error is generated by a processor
32

33
#[derive(Debug)]
×
34
struct ErrorProcessorFactory {
35
    err_on: u64,
36
    panic: bool,
37
}
38

39
impl ProcessorFactory<NoneContext> for ErrorProcessorFactory {
40
    fn get_output_schema(
3✔
41
        &self,
3✔
42
        _output_port: &PortHandle,
3✔
43
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
3✔
44
    ) -> Result<(Schema, NoneContext), ExecutionError> {
3✔
45
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
46
    }
3✔
47

48
    fn get_input_ports(&self) -> Vec<PortHandle> {
12✔
49
        vec![DEFAULT_PORT_HANDLE]
12✔
50
    }
12✔
51

52
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
53
        vec![OutputPortDef::new(
9✔
54
            DEFAULT_PORT_HANDLE,
9✔
55
            OutputPortType::Stateless,
9✔
56
        )]
9✔
57
    }
9✔
58

59
    fn build(
3✔
60
        &self,
3✔
61
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
62
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
63
        _txn: &mut LmdbExclusiveTransaction,
3✔
64
    ) -> Result<Box<dyn Processor>, ExecutionError> {
3✔
65
        Ok(Box::new(ErrorProcessor {
3✔
66
            err_on: self.err_on,
3✔
67
            count: 0,
3✔
68
            panic: self.panic,
3✔
69
        }))
3✔
70
    }
3✔
71
}
72

73
#[derive(Debug)]
×
74
struct ErrorProcessor {
75
    err_on: u64,
76
    count: u64,
77
    panic: bool,
78
}
79

80
impl Processor for ErrorProcessor {
81
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
501✔
82
        Ok(())
501✔
83
    }
501✔
84

85
    fn process(
2,398,077✔
86
        &mut self,
2,398,077✔
87
        _from_port: PortHandle,
2,398,077✔
88
        op: Operation,
2,398,077✔
89
        fw: &mut dyn ProcessorChannelForwarder,
2,398,077✔
90
        _tx: &SharedTransaction,
2,398,077✔
91
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
2,398,077✔
92
    ) -> Result<(), ExecutionError> {
2,398,077✔
93
        self.count += 1;
2,398,077✔
94
        if self.count == self.err_on {
2,398,077✔
95
            if self.panic {
3✔
96
                panic!("Generated error");
1✔
97
            } else {
98
                return Err(ExecutionError::InvalidOperation("Uknown".to_string()));
2✔
99
            }
100
        }
2,398,074✔
101

2,398,074✔
102
        fw.send(op, DEFAULT_PORT_HANDLE)
2,398,074✔
103
    }
2,398,076✔
104
}
105

106
#[test]
1✔
107
#[should_panic]
108
fn test_run_dag_proc_err_panic() {
1✔
109
    let count: u64 = 1_000_000;
1✔
110

1✔
111
    let mut dag = Dag::new();
1✔
112
    let latch = Arc::new(AtomicBool::new(true));
1✔
113

1✔
114
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
115
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
116
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
117

1✔
118
    dag.add_source(
1✔
119
        source_handle.clone(),
1✔
120
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
121
    );
1✔
122
    dag.add_processor(
1✔
123
        proc_handle.clone(),
1✔
124
        Arc::new(ErrorProcessorFactory {
1✔
125
            err_on: 800_000,
1✔
126
            panic: true,
1✔
127
        }),
1✔
128
    );
1✔
129
    dag.add_sink(
1✔
130
        sink_handle.clone(),
1✔
131
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
132
    );
1✔
133

1✔
134
    chk!(dag.connect(
1✔
135
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
136
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
137
    ));
1✔
138

1✔
139
    chk!(dag.connect(
1✔
140
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
141
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
142
    ));
1✔
143

1✔
144
    let tmp_dir = chk!(TempDir::new("test"));
1✔
145
    DagExecutor::new(
1✔
146
        &dag,
1✔
147
        tmp_dir.path().to_path_buf(),
1✔
148
        ExecutorOptions::default(),
1✔
149
    )
1✔
150
    .unwrap()
1✔
151
    .start(Arc::new(AtomicBool::new(true)))
1✔
152
    .unwrap()
1✔
153
    .join()
1✔
154
    .unwrap();
1✔
155
}
1✔
156

157
#[test]
1✔
158
#[should_panic]
159
fn test_run_dag_proc_err_2() {
1✔
160
    let count: u64 = 1_000_000;
1✔
161

1✔
162
    let mut dag = Dag::new();
1✔
163
    let latch = Arc::new(AtomicBool::new(true));
1✔
164

1✔
165
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
166
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
167
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
168
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
169

1✔
170
    dag.add_source(
1✔
171
        source_handle.clone(),
1✔
172
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
173
    );
1✔
174
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
175

1✔
176
    dag.add_processor(
1✔
177
        proc_err_handle.clone(),
1✔
178
        Arc::new(ErrorProcessorFactory {
1✔
179
            err_on: 800_000,
1✔
180
            panic: false,
1✔
181
        }),
1✔
182
    );
1✔
183

1✔
184
    dag.add_sink(
1✔
185
        sink_handle.clone(),
1✔
186
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
187
    );
1✔
188

1✔
189
    chk!(dag.connect(
1✔
190
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
191
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
192
    ));
1✔
193

1✔
194
    chk!(dag.connect(
1✔
195
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
196
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
197
    ));
1✔
198

1✔
199
    chk!(dag.connect(
1✔
200
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
201
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
202
    ));
1✔
203

1✔
204
    let tmp_dir = chk!(TempDir::new("test"));
1✔
205
    DagExecutor::new(
1✔
206
        &dag,
1✔
207
        tmp_dir.path().to_path_buf(),
1✔
208
        ExecutorOptions::default(),
1✔
209
    )
1✔
210
    .unwrap()
1✔
211
    .start(Arc::new(AtomicBool::new(true)))
1✔
212
    .unwrap()
1✔
213
    .join()
1✔
214
    .unwrap();
1✔
215
}
1✔
216

217
#[test]
1✔
218
#[should_panic]
219
fn test_run_dag_proc_err_3() {
1✔
220
    let count: u64 = 1_000_000;
1✔
221

1✔
222
    let mut dag = Dag::new();
1✔
223
    let latch = Arc::new(AtomicBool::new(true));
1✔
224

1✔
225
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
226
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
227
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
228
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
229

1✔
230
    dag.add_source(
1✔
231
        source_handle.clone(),
1✔
232
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
233
    );
1✔
234

1✔
235
    dag.add_processor(
1✔
236
        proc_err_handle.clone(),
1✔
237
        Arc::new(ErrorProcessorFactory {
1✔
238
            err_on: 800_000,
1✔
239
            panic: false,
1✔
240
        }),
1✔
241
    );
1✔
242

1✔
243
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
244

1✔
245
    dag.add_sink(
1✔
246
        sink_handle.clone(),
1✔
247
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
248
    );
1✔
249

1✔
250
    chk!(dag.connect(
1✔
251
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
252
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
×
253
    ));
1✔
254

1✔
255
    chk!(dag.connect(
1✔
256
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
×
257
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
258
    ));
1✔
259

1✔
260
    chk!(dag.connect(
1✔
261
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
262
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
263
    ));
1✔
264

1✔
265
    let tmp_dir = chk!(TempDir::new("test"));
1✔
266
    DagExecutor::new(
1✔
267
        &dag,
1✔
268
        tmp_dir.path().to_path_buf(),
1✔
269
        ExecutorOptions::default(),
1✔
270
    )
1✔
271
    .unwrap()
1✔
272
    .start(Arc::new(AtomicBool::new(true)))
1✔
273
    .unwrap()
1✔
274
    .join()
1✔
275
    .unwrap();
1✔
276
}
1✔
277

278
// Test when error is generated by a source
279

280
#[derive(Debug)]
×
281
pub(crate) struct ErrGeneratorSourceFactory {
282
    count: u64,
283
    err_at: u64,
284
}
285

286
impl ErrGeneratorSourceFactory {
287
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
288
        Self { count, err_at }
1✔
289
    }
1✔
290
}
291

292
impl SourceFactory<NoneContext> for ErrGeneratorSourceFactory {
293
    fn get_output_schema(
1✔
294
        &self,
1✔
295
        _port: &PortHandle,
1✔
296
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
297
        Ok((
1✔
298
            Schema::empty()
1✔
299
                .field(
1✔
300
                    FieldDefinition::new(
1✔
301
                        "id".to_string(),
1✔
302
                        FieldType::String,
1✔
303
                        false,
1✔
304
                        SourceDefinition::Dynamic,
1✔
305
                    ),
1✔
306
                    true,
1✔
307
                )
1✔
308
                .field(
1✔
309
                    FieldDefinition::new(
1✔
310
                        "value".to_string(),
1✔
311
                        FieldType::String,
1✔
312
                        false,
1✔
313
                        SourceDefinition::Dynamic,
1✔
314
                    ),
1✔
315
                    false,
1✔
316
                )
1✔
317
                .clone(),
1✔
318
            NoneContext {},
1✔
319
        ))
1✔
320
    }
1✔
321

322
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
323
        Ok(vec![OutputPortDef::new(
3✔
324
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
325
            OutputPortType::Stateless,
3✔
326
        )])
3✔
327
    }
3✔
328

329
    fn build(
1✔
330
        &self,
1✔
331
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
332
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
333
        Ok(Box::new(ErrGeneratorSource {
1✔
334
            count: self.count,
1✔
335
            err_at: self.err_at,
1✔
336
        }))
1✔
337
    }
1✔
338
}
339

340
#[derive(Debug)]
×
341
pub(crate) struct ErrGeneratorSource {
342
    count: u64,
343
    err_at: u64,
344
}
345

346
impl Source for ErrGeneratorSource {
347
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError> {
×
348
        Ok(false)
×
349
    }
×
350

351
    fn start(
1✔
352
        &self,
1✔
353
        fw: &mut dyn SourceChannelForwarder,
1✔
354
        _checkpoint: Option<(u64, u64)>,
1✔
355
    ) -> Result<(), ExecutionError> {
1✔
356
        for n in 1..(self.count + 1) {
200,000✔
357
            if n == self.err_at {
200,000✔
358
                return Err(ExecutionError::InvalidOperation(
1✔
359
                    "Generated Error".to_string(),
1✔
360
                ));
1✔
361
            }
199,999✔
362

199,999✔
363
            fw.send(
199,999✔
364
                n,
199,999✔
365
                0,
199,999✔
366
                Operation::Insert {
199,999✔
367
                    new: Record::new(
199,999✔
368
                        None,
199,999✔
369
                        vec![
199,999✔
370
                            Field::String(format!("key_{n}")),
199,999✔
371
                            Field::String(format!("value_{n}")),
199,999✔
372
                        ],
199,999✔
373
                        None,
199,999✔
374
                    ),
199,999✔
375
                },
199,999✔
376
                GENERATOR_SOURCE_OUTPUT_PORT,
199,999✔
377
            )?;
199,999✔
378
        }
379
        Ok(())
×
380
    }
1✔
381
}
382

383
#[test]
1✔
384
fn test_run_dag_src_err() {
1✔
385
    let count: u64 = 1_000_000;
1✔
386

1✔
387
    let mut dag = Dag::new();
1✔
388
    let latch = Arc::new(AtomicBool::new(true));
1✔
389

1✔
390
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
391
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
392
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
393

1✔
394
    dag.add_source(
1✔
395
        source_handle.clone(),
1✔
396
        Arc::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
397
    );
1✔
398
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
399
    dag.add_sink(
1✔
400
        sink_handle.clone(),
1✔
401
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
402
    );
1✔
403

1✔
404
    chk!(dag.connect(
1✔
405
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
406
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
407
    ));
1✔
408

1✔
409
    chk!(dag.connect(
1✔
410
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
411
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
412
    ));
×
413

1✔
414
    let tmp_dir = chk!(TempDir::new("test"));
1✔
415
    let _join_handle = DagExecutor::new(
1✔
416
        &dag,
1✔
417
        tmp_dir.path().to_path_buf(),
1✔
418
        ExecutorOptions::default(),
1✔
419
    )
1✔
420
    .unwrap()
1✔
421
    .start(Arc::new(AtomicBool::new(true)))
1✔
422
    .unwrap();
1✔
423
    // join_handle.join().unwrap();
1✔
424
}
1✔
425

426
#[derive(Debug)]
×
427
pub(crate) struct ErrSinkFactory {
428
    err_at: u64,
429
    panic: bool,
430
}
431

432
impl ErrSinkFactory {
433
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
434
        Self { err_at, panic }
2✔
435
    }
2✔
436
}
437

438
impl SinkFactory<NoneContext> for ErrSinkFactory {
439
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
440
        vec![COUNTING_SINK_INPUT_PORT]
8✔
441
    }
8✔
442

443
    fn prepare(
2✔
444
        &self,
2✔
445
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
2✔
446
    ) -> Result<(), ExecutionError> {
2✔
447
        Ok(())
2✔
448
    }
2✔
449

450
    fn build(
2✔
451
        &self,
2✔
452
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
453
    ) -> Result<Box<dyn Sink>, ExecutionError> {
2✔
454
        Ok(Box::new(ErrSink {
2✔
455
            err_at: self.err_at,
2✔
456
            current: 0,
2✔
457
            panic: self.panic,
2✔
458
        }))
2✔
459
    }
2✔
460
}
461

462
#[derive(Debug)]
×
463
pub(crate) struct ErrSink {
464
    err_at: u64,
465
    current: u64,
466
    panic: bool,
467
}
468
impl Sink for ErrSink {
469
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
69✔
470
        Ok(())
69✔
471
    }
69✔
472

473
    fn process(
400,000✔
474
        &mut self,
400,000✔
475
        _from_port: PortHandle,
400,000✔
476
        _op: Operation,
400,000✔
477
        _state: &SharedTransaction,
400,000✔
478
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
400,000✔
479
    ) -> Result<(), ExecutionError> {
400,000✔
480
        self.current += 1;
400,000✔
481
        if self.current == self.err_at {
400,000✔
482
            if self.panic {
2✔
483
                panic!("Generated error");
1✔
484
            } else {
485
                return Err(ExecutionError::InvalidOperation(
1✔
486
                    "Generated error".to_string(),
1✔
487
                ));
1✔
488
            }
489
        }
399,998✔
490
        Ok(())
399,998✔
491
    }
399,999✔
492
}
493

494
#[test]
1✔
495
#[should_panic]
496
fn test_run_dag_sink_err() {
1✔
497
    let count: u64 = 1_000_000;
1✔
498

1✔
499
    let mut dag = Dag::new();
1✔
500
    let latch = Arc::new(AtomicBool::new(true));
1✔
501

1✔
502
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
503
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
504
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
505

1✔
506
    dag.add_source(
1✔
507
        source_handle.clone(),
1✔
508
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
509
    );
1✔
510
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
511
    dag.add_sink(
1✔
512
        sink_handle.clone(),
1✔
513
        Arc::new(ErrSinkFactory::new(200_000, false)),
1✔
514
    );
1✔
515

1✔
516
    chk!(dag.connect(
1✔
517
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
518
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
519
    ));
1✔
520

1✔
521
    chk!(dag.connect(
1✔
522
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
523
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
524
    ));
1✔
525

1✔
526
    let tmp_dir = chk!(TempDir::new("test"));
1✔
527
    DagExecutor::new(
1✔
528
        &dag,
1✔
529
        tmp_dir.path().to_path_buf(),
1✔
530
        ExecutorOptions::default(),
1✔
531
    )
1✔
532
    .unwrap()
1✔
533
    .start(Arc::new(AtomicBool::new(true)))
1✔
534
    .unwrap()
1✔
535
    .join()
1✔
536
    .unwrap();
1✔
537
}
1✔
538

539
#[test]
1✔
540
#[should_panic]
541
fn test_run_dag_sink_err_panic() {
1✔
542
    let count: u64 = 1_000_000;
1✔
543

1✔
544
    let mut dag = Dag::new();
1✔
545
    let latch = Arc::new(AtomicBool::new(true));
1✔
546

1✔
547
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
548
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
549
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
550

1✔
551
    dag.add_source(
1✔
552
        source_handle.clone(),
1✔
553
        Arc::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
554
    );
1✔
555
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
556
    dag.add_sink(
1✔
557
        sink_handle.clone(),
1✔
558
        Arc::new(ErrSinkFactory::new(200_000, true)),
1✔
559
    );
1✔
560

1✔
561
    chk!(dag.connect(
1✔
562
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
563
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
564
    ));
1✔
565

1✔
566
    chk!(dag.connect(
1✔
567
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
568
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
569
    ));
1✔
570

1✔
571
    let tmp_dir = chk!(TempDir::new("test"));
1✔
572
    DagExecutor::new(
1✔
573
        &dag,
1✔
574
        tmp_dir.path().to_path_buf(),
1✔
575
        ExecutorOptions::default(),
1✔
576
    )
1✔
577
    .unwrap()
1✔
578
    .start(Arc::new(AtomicBool::new(true)))
1✔
579
    .unwrap()
1✔
580
    .join()
1✔
581
    .unwrap();
1✔
582
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc