• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.95
/dozer-core/src/tests/dag_base_errors.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::checkpoint::create_checkpoint_for_test;
3
use crate::epoch::Epoch;
4
use crate::executor::DagExecutor;
5
use crate::executor_operation::ProcessorOperation;
6
use crate::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
8
    Source, SourceFactory, SourceState,
9
};
10
use crate::tests::dag_base_run::NoopProcessorFactory;
11
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
12
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
13
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
14
use dozer_log::storage::{Object, Queue};
15
use dozer_log::tokio;
16
use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer};
17
use dozer_types::errors::internal::BoxedError;
18
use dozer_types::ingestion_types::IngestionMessage;
19
use dozer_types::node::{NodeHandle, OpIdentifier};
20
use dozer_types::types::{
21
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
22
};
23

24
use std::collections::HashMap;
25
use std::panic;
26

27
use std::sync::atomic::AtomicBool;
28
use std::sync::Arc;
29

30
// Test when error is generated by a processor
31

32
#[derive(Debug)]
×
33
struct ErrorProcessorFactory {
34
    err_on: u64,
35
    panic: bool,
36
}
37

38
impl ProcessorFactory for ErrorProcessorFactory {
39
    fn type_name(&self) -> String {
×
40
        "Error".to_owned()
×
41
    }
×
42

43
    fn get_output_schema(
3✔
44
        &self,
3✔
45
        _output_port: &PortHandle,
3✔
46
        input_schemas: &HashMap<PortHandle, Schema>,
3✔
47
    ) -> Result<Schema, BoxedError> {
3✔
48
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
49
    }
3✔
50

51
    fn get_input_ports(&self) -> Vec<PortHandle> {
12✔
52
        vec![DEFAULT_PORT_HANDLE]
12✔
53
    }
12✔
54

55
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
56
        vec![OutputPortDef::new(
9✔
57
            DEFAULT_PORT_HANDLE,
9✔
58
            OutputPortType::Stateless,
9✔
59
        )]
9✔
60
    }
9✔
61

62
    fn build(
3✔
63
        &self,
3✔
64
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
65
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
66
        _record_store: &ProcessorRecordStoreDeserializer,
3✔
67
        _checkpoint_data: Option<Vec<u8>>,
3✔
68
    ) -> Result<Box<dyn Processor>, BoxedError> {
3✔
69
        Ok(Box::new(ErrorProcessor {
3✔
70
            err_on: self.err_on,
3✔
71
            count: 0,
3✔
72
            panic: self.panic,
3✔
73
        }))
3✔
74
    }
3✔
75

76
    fn id(&self) -> String {
×
77
        "Error".to_owned()
×
78
    }
×
79
}
80

81
#[derive(Debug)]
×
82
struct ErrorProcessor {
83
    err_on: u64,
84
    count: u64,
85
    panic: bool,
86
}
87

88
impl Processor for ErrorProcessor {
89
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
450✔
90
        Ok(())
450✔
91
    }
450✔
92

93
    fn process(
2,400,000✔
94
        &mut self,
2,400,000✔
95
        _from_port: PortHandle,
2,400,000✔
96
        _record_store: &ProcessorRecordStore,
2,400,000✔
97
        op: ProcessorOperation,
2,400,000✔
98
        fw: &mut dyn ProcessorChannelForwarder,
2,400,000✔
99
    ) -> Result<(), BoxedError> {
2,400,000✔
100
        self.count += 1;
2,400,000✔
101
        if self.count == self.err_on {
2,400,000✔
102
            if self.panic {
3✔
103
                panic!("Generated error");
1✔
104
            } else {
105
                return Err("Uknown".to_string().into());
2✔
106
            }
107
        }
2,399,997✔
108

2,399,997✔
109
        fw.send(op, DEFAULT_PORT_HANDLE);
2,399,997✔
110
        Ok(())
2,399,997✔
111
    }
2,399,999✔
112

113
    fn serialize(
21✔
114
        &mut self,
21✔
115
        _record_store: &ProcessorRecordStore,
21✔
116
        _object: Object,
21✔
117
    ) -> Result<(), BoxedError> {
21✔
118
        Ok(())
21✔
119
    }
21✔
120
}
121

122
#[tokio::test]
1✔
123
#[should_panic]
124
async fn test_run_dag_proc_err_panic() {
1✔
125
    let count: u64 = 1_000_000;
1✔
126

1✔
127
    let mut dag = Dag::new();
1✔
128
    let latch = Arc::new(AtomicBool::new(true));
1✔
129

1✔
130
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
131
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
132
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
133

1✔
134
    dag.add_source(
1✔
135
        source_handle.clone(),
1✔
136
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
137
    );
1✔
138
    dag.add_processor(
1✔
139
        proc_handle.clone(),
1✔
140
        Box::new(ErrorProcessorFactory {
1✔
141
            err_on: 800_000,
1✔
142
            panic: true,
1✔
143
        }),
1✔
144
    );
1✔
145
    dag.add_sink(
1✔
146
        sink_handle.clone(),
1✔
147
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
148
    );
1✔
149

1✔
150
    dag.connect(
1✔
151
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
152
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
153
    )
1✔
154
    .unwrap();
1✔
155

1✔
156
    dag.connect(
1✔
157
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
158
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
159
    )
1✔
160
    .unwrap();
1✔
161

162
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
163
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
164
        .await
×
165
        .unwrap()
1✔
166
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
167
        .unwrap()
1✔
168
        .join()
1✔
169
        .unwrap();
1✔
170
}
171

172
#[tokio::test]
1✔
173
#[should_panic]
174
async fn test_run_dag_proc_err_2() {
1✔
175
    let count: u64 = 1_000_000;
1✔
176

1✔
177
    let mut dag = Dag::new();
1✔
178
    let latch = Arc::new(AtomicBool::new(true));
1✔
179

1✔
180
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
181
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
182
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
183
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
184

1✔
185
    dag.add_source(
1✔
186
        source_handle.clone(),
1✔
187
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
188
    );
1✔
189
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
190

1✔
191
    dag.add_processor(
1✔
192
        proc_err_handle.clone(),
1✔
193
        Box::new(ErrorProcessorFactory {
1✔
194
            err_on: 800_000,
1✔
195
            panic: false,
1✔
196
        }),
1✔
197
    );
1✔
198

1✔
199
    dag.add_sink(
1✔
200
        sink_handle.clone(),
1✔
201
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
202
    );
1✔
203

1✔
204
    dag.connect(
1✔
205
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
206
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
207
    )
1✔
208
    .unwrap();
1✔
209

1✔
210
    dag.connect(
1✔
211
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
212
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
213
    )
1✔
214
    .unwrap();
1✔
215

1✔
216
    dag.connect(
1✔
217
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
218
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
219
    )
1✔
220
    .unwrap();
1✔
221

222
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
223
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
224
        .await
×
225
        .unwrap()
1✔
226
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
227
        .unwrap()
1✔
228
        .join()
1✔
229
        .unwrap();
1✔
230
}
231

232
#[tokio::test]
1✔
233
#[should_panic]
234
async fn test_run_dag_proc_err_3() {
1✔
235
    let count: u64 = 1_000_000;
1✔
236

1✔
237
    let mut dag = Dag::new();
1✔
238
    let latch = Arc::new(AtomicBool::new(true));
1✔
239

1✔
240
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
241
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
242
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
243
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
244

1✔
245
    dag.add_source(
1✔
246
        source_handle.clone(),
1✔
247
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
248
    );
1✔
249

1✔
250
    dag.add_processor(
1✔
251
        proc_err_handle.clone(),
1✔
252
        Box::new(ErrorProcessorFactory {
1✔
253
            err_on: 800_000,
1✔
254
            panic: false,
1✔
255
        }),
1✔
256
    );
1✔
257

1✔
258
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
259

1✔
260
    dag.add_sink(
1✔
261
        sink_handle.clone(),
1✔
262
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
263
    );
1✔
264

1✔
265
    dag.connect(
1✔
266
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
267
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
268
    )
1✔
269
    .unwrap();
1✔
270

1✔
271
    dag.connect(
1✔
272
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
273
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
274
    )
1✔
275
    .unwrap();
1✔
276

1✔
277
    dag.connect(
1✔
278
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
279
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
280
    )
1✔
281
    .unwrap();
1✔
282

283
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
284
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
285
        .await
×
286
        .unwrap()
1✔
287
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
288
        .unwrap()
1✔
289
        .join()
1✔
290
        .unwrap();
1✔
291
}
292

293
// Test when error is generated by a source
294

295
#[derive(Debug)]
×
296
pub(crate) struct ErrGeneratorSourceFactory {
297
    count: u64,
298
    err_at: u64,
299
}
300

301
impl ErrGeneratorSourceFactory {
302
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
303
        Self { count, err_at }
1✔
304
    }
1✔
305
}
306

307
impl SourceFactory for ErrGeneratorSourceFactory {
308
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, BoxedError> {
1✔
309
        Ok(Schema::default()
1✔
310
            .field(
1✔
311
                FieldDefinition::new(
1✔
312
                    "id".to_string(),
1✔
313
                    FieldType::String,
1✔
314
                    false,
1✔
315
                    SourceDefinition::Dynamic,
1✔
316
                ),
1✔
317
                true,
1✔
318
            )
1✔
319
            .field(
1✔
320
                FieldDefinition::new(
1✔
321
                    "value".to_string(),
1✔
322
                    FieldType::String,
1✔
323
                    false,
1✔
324
                    SourceDefinition::Dynamic,
1✔
325
                ),
1✔
326
                false,
1✔
327
            )
1✔
328
            .clone())
1✔
329
    }
1✔
330

331
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
1✔
332
        "error".to_string()
1✔
333
    }
1✔
334

335
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
4✔
336
        vec![OutputPortDef::new(
4✔
337
            GENERATOR_SOURCE_OUTPUT_PORT,
4✔
338
            OutputPortType::Stateless,
4✔
339
        )]
4✔
340
    }
4✔
341

342
    fn build(
1✔
343
        &self,
1✔
344
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
345
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
346
        Ok(Box::new(ErrGeneratorSource {
1✔
347
            count: self.count,
1✔
348
            err_at: self.err_at,
1✔
349
        }))
1✔
350
    }
1✔
351
}
352

353
#[derive(Debug)]
×
354
pub(crate) struct ErrGeneratorSource {
355
    count: u64,
356
    err_at: u64,
357
}
358

359
impl Source for ErrGeneratorSource {
360
    fn start(
1✔
361
        &self,
1✔
362
        fw: &mut dyn SourceChannelForwarder,
1✔
363
        _checkpoint: SourceState,
1✔
364
    ) -> Result<(), BoxedError> {
1✔
365
        for n in 1..(self.count + 1) {
124,293✔
366
            if n == self.err_at {
124,293✔
367
                return Err("Generated Error".to_string().into());
×
368
            }
124,293✔
369

124,293✔
370
            fw.send(
124,293✔
371
                IngestionMessage::OperationEvent {
124,293✔
372
                    table_index: 0,
124,293✔
373
                    op: Operation::Insert {
124,293✔
374
                        new: Record::new(vec![
124,293✔
375
                            Field::String(format!("key_{n}")),
124,293✔
376
                            Field::String(format!("value_{n}")),
124,293✔
377
                        ]),
124,293✔
378
                    },
124,293✔
379
                    id: Some(OpIdentifier::new(n, 0)),
124,293✔
380
                },
124,293✔
381
                GENERATOR_SOURCE_OUTPUT_PORT,
124,293✔
382
            )?;
124,293✔
383
        }
384
        Ok(())
×
385
    }
1✔
386
}
387

388
#[tokio::test]
1✔
389
async fn test_run_dag_src_err() {
1✔
390
    let count: u64 = 1_000_000;
1✔
391

1✔
392
    let mut dag = Dag::new();
1✔
393
    let latch = Arc::new(AtomicBool::new(true));
1✔
394

1✔
395
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
396
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
397
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
398

1✔
399
    dag.add_source(
1✔
400
        source_handle.clone(),
1✔
401
        Box::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
402
    );
1✔
403
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
404
    dag.add_sink(
1✔
405
        sink_handle.clone(),
1✔
406
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
407
    );
1✔
408

1✔
409
    dag.connect(
1✔
410
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
411
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
412
    )
1✔
413
    .unwrap();
1✔
414

1✔
415
    dag.connect(
1✔
416
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
417
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
418
    )
1✔
419
    .unwrap();
1✔
420

421
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
422
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
423
        .await
×
424
        .unwrap()
1✔
425
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
426
        .unwrap();
1✔
427
    // join_handle.join().unwrap();
428
}
429

430
#[derive(Debug)]
×
431
pub(crate) struct ErrSinkFactory {
432
    err_at: u64,
433
    panic: bool,
434
}
435

436
impl ErrSinkFactory {
437
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
438
        Self { err_at, panic }
2✔
439
    }
2✔
440
}
441

442
impl SinkFactory for ErrSinkFactory {
443
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
444
        vec![COUNTING_SINK_INPUT_PORT]
8✔
445
    }
8✔
446

447
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
2✔
448
        Ok(())
2✔
449
    }
2✔
450

451
    fn build(
2✔
452
        &self,
2✔
453
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
454
    ) -> Result<Box<dyn Sink>, BoxedError> {
2✔
455
        Ok(Box::new(ErrSink {
2✔
456
            err_at: self.err_at,
2✔
457
            current: 0,
2✔
458
            panic: self.panic,
2✔
459
        }))
2✔
460
    }
2✔
461
}
462

463
#[derive(Debug)]
×
464
pub(crate) struct ErrSink {
465
    err_at: u64,
466
    current: u64,
467
    panic: bool,
468
}
469
impl Sink for ErrSink {
470
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
124✔
471
        Ok(())
124✔
472
    }
124✔
473

474
    fn process(
400,000✔
475
        &mut self,
400,000✔
476
        _from_port: PortHandle,
400,000✔
477
        _record_store: &ProcessorRecordStore,
400,000✔
478
        _op: ProcessorOperation,
400,000✔
479
    ) -> Result<(), BoxedError> {
400,000✔
480
        self.current += 1;
400,000✔
481
        if self.current == self.err_at {
400,000✔
482
            if self.panic {
2✔
483
                panic!("Generated error");
1✔
484
            } else {
485
                return Err("Generated error".to_string().into());
1✔
486
            }
487
        }
399,998✔
488
        Ok(())
399,998✔
489
    }
399,999✔
490

491
    fn persist(&mut self, _queue: &Queue) -> Result<(), BoxedError> {
2✔
492
        Ok(())
2✔
493
    }
2✔
494

495
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
496
        Ok(())
×
497
    }
×
498
}
499

500
#[tokio::test]
1✔
501
#[should_panic]
502
async fn test_run_dag_sink_err() {
1✔
503
    let count: u64 = 1_000_000;
1✔
504

1✔
505
    let mut dag = Dag::new();
1✔
506
    let latch = Arc::new(AtomicBool::new(true));
1✔
507

1✔
508
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
509
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
510
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
511

1✔
512
    dag.add_source(
1✔
513
        source_handle.clone(),
1✔
514
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
515
    );
1✔
516
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
517
    dag.add_sink(
1✔
518
        sink_handle.clone(),
1✔
519
        Box::new(ErrSinkFactory::new(200_000, false)),
1✔
520
    );
1✔
521

1✔
522
    dag.connect(
1✔
523
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
524
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
525
    )
1✔
526
    .unwrap();
1✔
527

1✔
528
    dag.connect(
1✔
529
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
530
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
531
    )
1✔
532
    .unwrap();
1✔
533

534
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
535
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
536
        .await
×
537
        .unwrap()
1✔
538
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
539
        .unwrap()
1✔
540
        .join()
1✔
541
        .unwrap();
1✔
542
}
543

544
#[tokio::test]
1✔
545
#[should_panic]
546
async fn test_run_dag_sink_err_panic() {
1✔
547
    let count: u64 = 1_000_000;
1✔
548

1✔
549
    let mut dag = Dag::new();
1✔
550
    let latch = Arc::new(AtomicBool::new(true));
1✔
551

1✔
552
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
553
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
554
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
555

1✔
556
    dag.add_source(
1✔
557
        source_handle.clone(),
1✔
558
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
559
    );
1✔
560
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
561
    dag.add_sink(
1✔
562
        sink_handle.clone(),
1✔
563
        Box::new(ErrSinkFactory::new(200_000, true)),
1✔
564
    );
1✔
565

1✔
566
    dag.connect(
1✔
567
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
568
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
569
    )
1✔
570
    .unwrap();
1✔
571

1✔
572
    dag.connect(
1✔
573
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
574
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
575
    )
1✔
576
    .unwrap();
1✔
577

578
    let (_temp_dir, checkpoint) = create_checkpoint_for_test().await;
1✔
579
    DagExecutor::new(dag, checkpoint, Default::default())
1✔
580
        .await
×
581
        .unwrap()
1✔
582
        .start(Arc::new(AtomicBool::new(true)), Default::default())
1✔
583
        .unwrap()
1✔
584
        .join()
1✔
585
        .unwrap();
1✔
586
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc