• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6008856021

29 Aug 2023 06:44AM UTC coverage: 76.756% (-1.0%) from 77.736%
6008856021

push

github

web-flow
chore: Remove unused generic type parameter in `dozer-core` (#1929)

330 of 330 new or added lines in 38 files covered. (100.0%)

48977 of 63809 relevant lines covered (76.76%)

48470.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.62
/dozer-core/src/tests/dag_base_errors.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::checkpoint::create_checkpoint_factory_for_test;
3
use crate::epoch::Epoch;
4
use crate::executor::{DagExecutor, ExecutorOptions};
5
use crate::executor_operation::ProcessorOperation;
6
use crate::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
8
    Source, SourceFactory,
9
};
10
use crate::processor_record::ProcessorRecordStore;
11
use crate::tests::dag_base_run::NoopProcessorFactory;
12
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
13
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
14
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
15
use dozer_log::storage::Queue;
16
use dozer_log::tokio;
17
use dozer_types::errors::internal::BoxedError;
18
use dozer_types::ingestion_types::IngestionMessage;
19
use dozer_types::node::NodeHandle;
20
use dozer_types::types::{
21
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
22
};
23

24
use std::collections::HashMap;
25
use std::panic;
26

27
use std::sync::atomic::AtomicBool;
28
use std::sync::Arc;
29

30
// Test when error is generated by a processor
31

32
#[derive(Debug)]
×
33
struct ErrorProcessorFactory {
34
    err_on: u64,
×
35
    panic: bool,
36
}
37

38
impl ProcessorFactory for ErrorProcessorFactory {
39
    fn type_name(&self) -> String {
×
40
        "Error".to_owned()
×
41
    }
×
42

×
43
    fn get_output_schema(
3✔
44
        &self,
3✔
45
        _output_port: &PortHandle,
3✔
46
        input_schemas: &HashMap<PortHandle, Schema>,
3✔
47
    ) -> Result<Schema, BoxedError> {
3✔
48
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
49
    }
3✔
50

×
51
    fn get_input_ports(&self) -> Vec<PortHandle> {
12✔
52
        vec![DEFAULT_PORT_HANDLE]
12✔
53
    }
12✔
54

×
55
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
56
        vec![OutputPortDef::new(
9✔
57
            DEFAULT_PORT_HANDLE,
9✔
58
            OutputPortType::Stateless,
9✔
59
        )]
9✔
60
    }
9✔
61

×
62
    fn build(
3✔
63
        &self,
3✔
64
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
65
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
66
        _record_store: &ProcessorRecordStore,
3✔
67
    ) -> Result<Box<dyn Processor>, BoxedError> {
3✔
68
        Ok(Box::new(ErrorProcessor {
3✔
69
            err_on: self.err_on,
3✔
70
            count: 0,
3✔
71
            panic: self.panic,
3✔
72
        }))
3✔
73
    }
3✔
74

×
75
    fn id(&self) -> String {
×
76
        "Error".to_owned()
×
77
    }
×
78
}
×
79

×
80
#[derive(Debug)]
×
81
struct ErrorProcessor {
82
    err_on: u64,
×
83
    count: u64,
84
    panic: bool,
85
}
86

87
impl Processor for ErrorProcessor {
88
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
308✔
89
        Ok(())
308✔
90
    }
308✔
91

×
92
    fn process(
2,400,000✔
93
        &mut self,
2,400,000✔
94
        _from_port: PortHandle,
2,400,000✔
95
        _record_store: &ProcessorRecordStore,
2,400,000✔
96
        op: ProcessorOperation,
2,400,000✔
97
        fw: &mut dyn ProcessorChannelForwarder,
2,400,000✔
98
    ) -> Result<(), BoxedError> {
2,400,000✔
99
        self.count += 1;
2,400,000✔
100
        if self.count == self.err_on {
2,400,000✔
101
            if self.panic {
3✔
102
                panic!("Generated error");
1✔
103
            } else {
×
104
                return Err("Uknown".to_string().into());
2✔
105
            }
106
        }
2,399,997✔
107

2,399,997✔
108
        fw.send(op, DEFAULT_PORT_HANDLE);
2,399,997✔
109
        Ok(())
2,399,997✔
110
    }
2,399,999✔
111
}
×
112

×
113
#[tokio::test]
1✔
114
#[should_panic]
115
async fn test_run_dag_proc_err_panic() {
1✔
116
    let count: u64 = 1_000_000;
1✔
117

1✔
118
    let mut dag = Dag::new();
1✔
119
    let latch = Arc::new(AtomicBool::new(true));
1✔
120

1✔
121
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
122
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
123
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
124

1✔
125
    dag.add_source(
1✔
126
        source_handle.clone(),
1✔
127
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
128
    );
1✔
129
    dag.add_processor(
1✔
130
        proc_handle.clone(),
1✔
131
        Box::new(ErrorProcessorFactory {
1✔
132
            err_on: 800_000,
1✔
133
            panic: true,
1✔
134
        }),
1✔
135
    );
1✔
136
    dag.add_sink(
1✔
137
        sink_handle.clone(),
1✔
138
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
139
    );
1✔
140

1✔
141
    dag.connect(
1✔
142
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
143
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
144
    )
1✔
145
    .unwrap();
1✔
146

1✔
147
    dag.connect(
1✔
148
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
149
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
150
    )
1✔
151
    .unwrap();
1✔
152

×
153
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
154
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
155
        .unwrap()
1✔
156
        .start(Arc::new(AtomicBool::new(true)))
1✔
157
        .unwrap()
1✔
158
        .join()
1✔
159
        .unwrap();
1✔
160
}
×
161

×
162
#[tokio::test]
1✔
163
#[should_panic]
164
async fn test_run_dag_proc_err_2() {
1✔
165
    let count: u64 = 1_000_000;
1✔
166

1✔
167
    let mut dag = Dag::new();
1✔
168
    let latch = Arc::new(AtomicBool::new(true));
1✔
169

1✔
170
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
171
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
172
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
173
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
174

1✔
175
    dag.add_source(
1✔
176
        source_handle.clone(),
1✔
177
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
178
    );
1✔
179
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
180

1✔
181
    dag.add_processor(
1✔
182
        proc_err_handle.clone(),
1✔
183
        Box::new(ErrorProcessorFactory {
1✔
184
            err_on: 800_000,
1✔
185
            panic: false,
1✔
186
        }),
1✔
187
    );
1✔
188

1✔
189
    dag.add_sink(
1✔
190
        sink_handle.clone(),
1✔
191
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
192
    );
1✔
193

1✔
194
    dag.connect(
1✔
195
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
196
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
197
    )
1✔
198
    .unwrap();
1✔
199

1✔
200
    dag.connect(
1✔
201
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
202
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
203
    )
1✔
204
    .unwrap();
1✔
205

1✔
206
    dag.connect(
1✔
207
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
208
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
209
    )
1✔
210
    .unwrap();
1✔
211

×
212
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
213
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
214
        .unwrap()
1✔
215
        .start(Arc::new(AtomicBool::new(true)))
1✔
216
        .unwrap()
1✔
217
        .join()
1✔
218
        .unwrap();
1✔
219
}
×
220

×
221
#[tokio::test]
1✔
222
#[should_panic]
223
async fn test_run_dag_proc_err_3() {
1✔
224
    let count: u64 = 1_000_000;
1✔
225

1✔
226
    let mut dag = Dag::new();
1✔
227
    let latch = Arc::new(AtomicBool::new(true));
1✔
228

1✔
229
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
230
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
231
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
232
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
233

1✔
234
    dag.add_source(
1✔
235
        source_handle.clone(),
1✔
236
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
237
    );
1✔
238

1✔
239
    dag.add_processor(
1✔
240
        proc_err_handle.clone(),
1✔
241
        Box::new(ErrorProcessorFactory {
1✔
242
            err_on: 800_000,
1✔
243
            panic: false,
1✔
244
        }),
1✔
245
    );
1✔
246

1✔
247
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
248

1✔
249
    dag.add_sink(
1✔
250
        sink_handle.clone(),
1✔
251
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
252
    );
1✔
253

1✔
254
    dag.connect(
1✔
255
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
256
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
257
    )
1✔
258
    .unwrap();
1✔
259

1✔
260
    dag.connect(
1✔
261
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
262
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
263
    )
1✔
264
    .unwrap();
1✔
265

1✔
266
    dag.connect(
1✔
267
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
268
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
269
    )
1✔
270
    .unwrap();
1✔
271

×
272
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
273
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
274
        .unwrap()
1✔
275
        .start(Arc::new(AtomicBool::new(true)))
1✔
276
        .unwrap()
1✔
277
        .join()
1✔
278
        .unwrap();
1✔
279
}
×
280

×
281
// Test when error is generated by a source
282

283
#[derive(Debug)]
×
284
pub(crate) struct ErrGeneratorSourceFactory {
285
    count: u64,
×
286
    err_at: u64,
287
}
288

289
impl ErrGeneratorSourceFactory {
290
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
291
        Self { count, err_at }
1✔
292
    }
1✔
293
}
×
294

×
295
impl SourceFactory for ErrGeneratorSourceFactory {
296
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, BoxedError> {
1✔
297
        Ok(Schema::default()
1✔
298
            .field(
1✔
299
                FieldDefinition::new(
1✔
300
                    "id".to_string(),
1✔
301
                    FieldType::String,
1✔
302
                    false,
1✔
303
                    SourceDefinition::Dynamic,
1✔
304
                ),
1✔
305
                true,
1✔
306
            )
1✔
307
            .field(
1✔
308
                FieldDefinition::new(
1✔
309
                    "value".to_string(),
1✔
310
                    FieldType::String,
1✔
311
                    false,
1✔
312
                    SourceDefinition::Dynamic,
1✔
313
                ),
1✔
314
                false,
1✔
315
            )
1✔
316
            .clone())
1✔
317
    }
1✔
318

×
319
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
320
        "error".to_string()
×
321
    }
×
322

×
323
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
324
        vec![OutputPortDef::new(
3✔
325
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
326
            OutputPortType::Stateless,
3✔
327
        )]
3✔
328
    }
3✔
329

×
330
    fn build(
1✔
331
        &self,
1✔
332
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
333
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
334
        Ok(Box::new(ErrGeneratorSource {
1✔
335
            count: self.count,
1✔
336
            err_at: self.err_at,
1✔
337
        }))
1✔
338
    }
1✔
339
}
×
340

×
341
#[derive(Debug)]
×
342
pub(crate) struct ErrGeneratorSource {
×
343
    count: u64,
×
344
    err_at: u64,
345
}
346

×
347
impl Source for ErrGeneratorSource {
348
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
349
        Ok(false)
×
350
    }
×
351

352
    fn start(
1✔
353
        &self,
1✔
354
        fw: &mut dyn SourceChannelForwarder,
1✔
355
        _checkpoint: Option<(u64, u64)>,
1✔
356
    ) -> Result<(), BoxedError> {
1✔
357
        for n in 1..(self.count + 1) {
200,000✔
358
            if n == self.err_at {
200,000✔
359
                return Err("Generated Error".to_string().into());
1✔
360
            }
199,999✔
361

199,999✔
362
            fw.send(
199,999✔
363
                IngestionMessage::new_op(
199,999✔
364
                    n,
199,999✔
365
                    0,
199,999✔
366
                    0,
199,999✔
367
                    Operation::Insert {
199,999✔
368
                        new: Record::new(vec![
199,999✔
369
                            Field::String(format!("key_{n}")),
199,999✔
370
                            Field::String(format!("value_{n}")),
199,999✔
371
                        ]),
199,999✔
372
                    },
199,999✔
373
                ),
199,999✔
374
                GENERATOR_SOURCE_OUTPUT_PORT,
199,999✔
375
            )?;
199,999✔
376
        }
×
377
        Ok(())
×
378
    }
1✔
379
}
×
380

×
381
#[tokio::test]
1✔
382
async fn test_run_dag_src_err() {
1✔
383
    let count: u64 = 1_000_000;
1✔
384

1✔
385
    let mut dag = Dag::new();
1✔
386
    let latch = Arc::new(AtomicBool::new(true));
1✔
387

1✔
388
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
389
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
390
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
391

1✔
392
    dag.add_source(
1✔
393
        source_handle.clone(),
1✔
394
        Box::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
395
    );
1✔
396
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
397
    dag.add_sink(
1✔
398
        sink_handle.clone(),
1✔
399
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
400
    );
1✔
401

1✔
402
    dag.connect(
1✔
403
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
404
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
405
    )
1✔
406
    .unwrap();
1✔
407

1✔
408
    dag.connect(
1✔
409
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
410
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
411
    )
1✔
412
    .unwrap();
1✔
413

×
414
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
415
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
416
        .unwrap()
1✔
417
        .start(Arc::new(AtomicBool::new(true)))
1✔
418
        .unwrap();
1✔
419
    // join_handle.join().unwrap();
×
420
}
×
421

×
422
#[derive(Debug)]
×
423
pub(crate) struct ErrSinkFactory {
×
424
    err_at: u64,
425
    panic: bool,
426
}
427

×
428
impl ErrSinkFactory {
429
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
430
        Self { err_at, panic }
2✔
431
    }
2✔
432
}
433

434
impl SinkFactory for ErrSinkFactory {
×
435
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
436
        vec![COUNTING_SINK_INPUT_PORT]
8✔
437
    }
8✔
438

439
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
2✔
440
        Ok(())
2✔
441
    }
2✔
442

×
443
    fn build(
2✔
444
        &self,
2✔
445
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
446
    ) -> Result<Box<dyn Sink>, BoxedError> {
2✔
447
        Ok(Box::new(ErrSink {
2✔
448
            err_at: self.err_at,
2✔
449
            current: 0,
2✔
450
            panic: self.panic,
2✔
451
        }))
2✔
452
    }
2✔
453
}
×
454

×
455
#[derive(Debug)]
×
456
pub(crate) struct ErrSink {
×
457
    err_at: u64,
×
458
    current: u64,
×
459
    panic: bool,
×
460
}
×
461
impl Sink for ErrSink {
462
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
72✔
463
        Ok(())
72✔
464
    }
72✔
465

466
    fn process(
400,000✔
467
        &mut self,
400,000✔
468
        _from_port: PortHandle,
400,000✔
469
        _record_store: &ProcessorRecordStore,
400,000✔
470
        _op: ProcessorOperation,
400,000✔
471
    ) -> Result<(), BoxedError> {
400,000✔
472
        self.current += 1;
400,000✔
473
        if self.current == self.err_at {
400,000✔
474
            if self.panic {
2✔
475
                panic!("Generated error");
1✔
476
            } else {
×
477
                return Err("Generated error".to_string().into());
1✔
478
            }
×
479
        }
399,998✔
480
        Ok(())
399,998✔
481
    }
399,999✔
482

×
483
    fn persist(&mut self, _queue: &Queue) -> Result<(), BoxedError> {
2✔
484
        Ok(())
2✔
485
    }
2✔
486

487
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
488
        Ok(())
×
489
    }
×
490
}
491

×
492
#[tokio::test]
1✔
493
#[should_panic]
×
494
async fn test_run_dag_sink_err() {
1✔
495
    let count: u64 = 1_000_000;
1✔
496

1✔
497
    let mut dag = Dag::new();
1✔
498
    let latch = Arc::new(AtomicBool::new(true));
1✔
499

1✔
500
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
501
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
502
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
503

1✔
504
    dag.add_source(
1✔
505
        source_handle.clone(),
1✔
506
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
507
    );
1✔
508
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
509
    dag.add_sink(
1✔
510
        sink_handle.clone(),
1✔
511
        Box::new(ErrSinkFactory::new(200_000, false)),
1✔
512
    );
1✔
513

1✔
514
    dag.connect(
1✔
515
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
516
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
517
    )
1✔
518
    .unwrap();
1✔
519

1✔
520
    dag.connect(
1✔
521
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
522
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
523
    )
1✔
524
    .unwrap();
1✔
525

×
526
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
527
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
528
        .unwrap()
1✔
529
        .start(Arc::new(AtomicBool::new(true)))
1✔
530
        .unwrap()
1✔
531
        .join()
1✔
532
        .unwrap();
1✔
533
}
534

×
535
#[tokio::test]
1✔
536
#[should_panic]
×
537
async fn test_run_dag_sink_err_panic() {
1✔
538
    let count: u64 = 1_000_000;
1✔
539

1✔
540
    let mut dag = Dag::new();
1✔
541
    let latch = Arc::new(AtomicBool::new(true));
1✔
542

1✔
543
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
544
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
545
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
546

1✔
547
    dag.add_source(
1✔
548
        source_handle.clone(),
1✔
549
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
550
    );
1✔
551
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
552
    dag.add_sink(
1✔
553
        sink_handle.clone(),
1✔
554
        Box::new(ErrSinkFactory::new(200_000, true)),
1✔
555
    );
1✔
556

1✔
557
    dag.connect(
1✔
558
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
559
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
560
    )
1✔
561
    .unwrap();
1✔
562

1✔
563
    dag.connect(
1✔
564
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
565
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
566
    )
1✔
567
    .unwrap();
1✔
568

×
569
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
570
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
571
        .unwrap()
1✔
572
        .start(Arc::new(AtomicBool::new(true)))
1✔
573
        .unwrap()
1✔
574
        .join()
1✔
575
        .unwrap();
1✔
576
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc