• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.43
/dozer-core/src/tests/dag_base_errors.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::epoch::Epoch;
3
use crate::executor::{DagExecutor, ExecutorOptions};
4
use crate::executor_operation::ProcessorOperation;
5
use crate::node::{
6
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Sink, SinkFactory,
7
    Source, SourceFactory,
8
};
9
use crate::processor_record::ProcessorRecordStore;
10
use crate::tests::dag_base_run::NoopProcessorFactory;
11
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
12
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
13
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
14
use dozer_types::errors::internal::BoxedError;
15
use dozer_types::ingestion_types::IngestionMessage;
16
use dozer_types::node::NodeHandle;
17
use dozer_types::types::{
18
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
19
};
20

21
use std::collections::HashMap;
22
use std::panic;
23

24
use std::sync::atomic::AtomicBool;
25
use std::sync::Arc;
26

27
use crate::tests::app::NoneContext;
28

29
// Test when error is generated by a processor
30

31
#[derive(Debug)]
×
32
struct ErrorProcessorFactory {
33
    err_on: u64,
34
    panic: bool,
×
35
}
36

37
impl ProcessorFactory<NoneContext> for ErrorProcessorFactory {
38
    fn type_name(&self) -> String {
×
39
        "Error".to_owned()
×
40
    }
×
41

×
42
    fn get_output_schema(
3✔
43
        &self,
3✔
44
        _output_port: &PortHandle,
3✔
45
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
3✔
46
    ) -> Result<(Schema, NoneContext), BoxedError> {
3✔
47
        Ok(input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap().clone())
3✔
48
    }
3✔
49

×
50
    fn get_input_ports(&self) -> Vec<PortHandle> {
12✔
51
        vec![DEFAULT_PORT_HANDLE]
12✔
52
    }
12✔
53

×
54
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
9✔
55
        vec![OutputPortDef::new(
9✔
56
            DEFAULT_PORT_HANDLE,
9✔
57
            OutputPortType::Stateless,
9✔
58
        )]
9✔
59
    }
9✔
60

×
61
    fn build(
3✔
62
        &self,
3✔
63
        _input_schemas: HashMap<PortHandle, Schema>,
3✔
64
        _output_schemas: HashMap<PortHandle, Schema>,
3✔
65
        _record_store: &ProcessorRecordStore,
3✔
66
    ) -> Result<Box<dyn Processor>, BoxedError> {
3✔
67
        Ok(Box::new(ErrorProcessor {
3✔
68
            err_on: self.err_on,
3✔
69
            count: 0,
3✔
70
            panic: self.panic,
3✔
71
        }))
3✔
72
    }
3✔
73

×
74
    fn id(&self) -> String {
×
75
        "Error".to_owned()
×
76
    }
×
77
}
×
78

×
79
#[derive(Debug)]
×
80
struct ErrorProcessor {
81
    err_on: u64,
82
    count: u64,
×
83
    panic: bool,
84
}
85

86
impl Processor for ErrorProcessor {
87
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
307✔
88
        Ok(())
307✔
89
    }
307✔
90

×
91
    fn process(
2,400,000✔
92
        &mut self,
2,400,000✔
93
        _from_port: PortHandle,
2,400,000✔
94
        _record_store: &ProcessorRecordStore,
2,400,000✔
95
        op: ProcessorOperation,
2,400,000✔
96
        fw: &mut dyn ProcessorChannelForwarder,
2,400,000✔
97
    ) -> Result<(), BoxedError> {
2,400,000✔
98
        self.count += 1;
2,400,000✔
99
        if self.count == self.err_on {
2,400,000✔
100
            if self.panic {
3✔
101
                panic!("Generated error");
1✔
102
            } else {
×
103
                return Err("Uknown".to_string().into());
2✔
104
            }
×
105
        }
2,399,997✔
106

2,399,997✔
107
        fw.send(op, DEFAULT_PORT_HANDLE);
2,399,997✔
108
        Ok(())
2,399,997✔
109
    }
2,399,999✔
110
}
×
111

×
112
#[test]
1✔
113
#[should_panic]
114
fn test_run_dag_proc_err_panic() {
1✔
115
    let count: u64 = 1_000_000;
1✔
116

1✔
117
    let mut dag = Dag::new();
1✔
118
    let latch = Arc::new(AtomicBool::new(true));
1✔
119

1✔
120
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
121
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
122
    let sink_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
123

1✔
124
    dag.add_source(
1✔
125
        source_handle.clone(),
1✔
126
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
127
    );
1✔
128
    dag.add_processor(
1✔
129
        proc_handle.clone(),
1✔
130
        Box::new(ErrorProcessorFactory {
1✔
131
            err_on: 800_000,
1✔
132
            panic: true,
1✔
133
        }),
1✔
134
    );
1✔
135
    dag.add_sink(
1✔
136
        sink_handle.clone(),
1✔
137
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
138
    );
1✔
139

1✔
140
    dag.connect(
1✔
141
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
142
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
143
    )
1✔
144
    .unwrap();
1✔
145

1✔
146
    dag.connect(
1✔
147
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
148
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
149
    )
1✔
150
    .unwrap();
1✔
151

1✔
152
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
153
        .unwrap()
1✔
154
        .start(Arc::new(AtomicBool::new(true)))
1✔
155
        .unwrap()
1✔
156
        .join()
1✔
157
        .unwrap();
1✔
158
}
1✔
159

×
160
#[test]
1✔
161
#[should_panic]
×
162
fn test_run_dag_proc_err_2() {
1✔
163
    let count: u64 = 1_000_000;
1✔
164

1✔
165
    let mut dag = Dag::new();
1✔
166
    let latch = Arc::new(AtomicBool::new(true));
1✔
167

1✔
168
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
169
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
170
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
171
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
172

1✔
173
    dag.add_source(
1✔
174
        source_handle.clone(),
1✔
175
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
176
    );
1✔
177
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
178

1✔
179
    dag.add_processor(
1✔
180
        proc_err_handle.clone(),
1✔
181
        Box::new(ErrorProcessorFactory {
1✔
182
            err_on: 800_000,
1✔
183
            panic: false,
1✔
184
        }),
1✔
185
    );
1✔
186

1✔
187
    dag.add_sink(
1✔
188
        sink_handle.clone(),
1✔
189
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
190
    );
1✔
191

1✔
192
    dag.connect(
1✔
193
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
194
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
195
    )
1✔
196
    .unwrap();
1✔
197

1✔
198
    dag.connect(
1✔
199
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
200
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
201
    )
1✔
202
    .unwrap();
1✔
203

1✔
204
    dag.connect(
1✔
205
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
206
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
207
    )
1✔
208
    .unwrap();
1✔
209

1✔
210
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
211
        .unwrap()
1✔
212
        .start(Arc::new(AtomicBool::new(true)))
1✔
213
        .unwrap()
1✔
214
        .join()
1✔
215
        .unwrap();
1✔
216
}
1✔
217

×
218
#[test]
1✔
219
#[should_panic]
×
220
fn test_run_dag_proc_err_3() {
1✔
221
    let count: u64 = 1_000_000;
1✔
222

1✔
223
    let mut dag = Dag::new();
1✔
224
    let latch = Arc::new(AtomicBool::new(true));
1✔
225

1✔
226
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
227
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
228
    let proc_err_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
229
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
230

1✔
231
    dag.add_source(
1✔
232
        source_handle.clone(),
1✔
233
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
234
    );
1✔
235

1✔
236
    dag.add_processor(
1✔
237
        proc_err_handle.clone(),
1✔
238
        Box::new(ErrorProcessorFactory {
1✔
239
            err_on: 800_000,
1✔
240
            panic: false,
1✔
241
        }),
1✔
242
    );
1✔
243

1✔
244
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
245

1✔
246
    dag.add_sink(
1✔
247
        sink_handle.clone(),
1✔
248
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
249
    );
1✔
250

1✔
251
    dag.connect(
1✔
252
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
253
        Endpoint::new(proc_err_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
254
    )
1✔
255
    .unwrap();
1✔
256

1✔
257
    dag.connect(
1✔
258
        Endpoint::new(proc_err_handle, DEFAULT_PORT_HANDLE),
1✔
259
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
260
    )
1✔
261
    .unwrap();
1✔
262

1✔
263
    dag.connect(
1✔
264
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
265
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
266
    )
1✔
267
    .unwrap();
1✔
268

1✔
269
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
270
        .unwrap()
1✔
271
        .start(Arc::new(AtomicBool::new(true)))
1✔
272
        .unwrap()
1✔
273
        .join()
1✔
274
        .unwrap();
1✔
275
}
1✔
276

×
277
// Test when error is generated by a source
×
278

×
279
#[derive(Debug)]
×
280
pub(crate) struct ErrGeneratorSourceFactory {
×
281
    count: u64,
282
    err_at: u64,
283
}
284

285
impl ErrGeneratorSourceFactory {
×
286
    pub fn new(count: u64, err_at: u64) -> Self {
1✔
287
        Self { count, err_at }
1✔
288
    }
1✔
289
}
290

291
impl SourceFactory<NoneContext> for ErrGeneratorSourceFactory {
292
    fn get_output_schema(&self, _port: &PortHandle) -> Result<(Schema, NoneContext), BoxedError> {
1✔
293
        Ok((
1✔
294
            Schema::default()
1✔
295
                .field(
1✔
296
                    FieldDefinition::new(
1✔
297
                        "id".to_string(),
1✔
298
                        FieldType::String,
1✔
299
                        false,
1✔
300
                        SourceDefinition::Dynamic,
1✔
301
                    ),
1✔
302
                    true,
1✔
303
                )
1✔
304
                .field(
1✔
305
                    FieldDefinition::new(
1✔
306
                        "value".to_string(),
1✔
307
                        FieldType::String,
1✔
308
                        false,
1✔
309
                        SourceDefinition::Dynamic,
1✔
310
                    ),
1✔
311
                    false,
1✔
312
                )
1✔
313
                .clone(),
1✔
314
            NoneContext {},
1✔
315
        ))
1✔
316
    }
1✔
317

×
318
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
319
        "error".to_string()
×
320
    }
×
321

×
322
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
323
        vec![OutputPortDef::new(
3✔
324
            GENERATOR_SOURCE_OUTPUT_PORT,
3✔
325
            OutputPortType::Stateless,
3✔
326
        )]
3✔
327
    }
3✔
328

×
329
    fn build(
1✔
330
        &self,
1✔
331
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
332
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
333
        Ok(Box::new(ErrGeneratorSource {
1✔
334
            count: self.count,
1✔
335
            err_at: self.err_at,
1✔
336
        }))
1✔
337
    }
1✔
338
}
×
339

×
340
#[derive(Debug)]
×
341
pub(crate) struct ErrGeneratorSource {
×
342
    count: u64,
×
343
    err_at: u64,
×
344
}
345

346
impl Source for ErrGeneratorSource {
×
347
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
348
        Ok(false)
×
349
    }
×
350

351
    fn start(
1✔
352
        &self,
1✔
353
        fw: &mut dyn SourceChannelForwarder,
1✔
354
        _checkpoint: Option<(u64, u64)>,
1✔
355
    ) -> Result<(), BoxedError> {
1✔
356
        for n in 1..(self.count + 1) {
200,000✔
357
            if n == self.err_at {
200,000✔
358
                return Err("Generated Error".to_string().into());
1✔
359
            }
199,999✔
360

199,999✔
361
            fw.send(
199,999✔
362
                IngestionMessage::new_op(
199,999✔
363
                    n,
199,999✔
364
                    0,
199,999✔
365
                    0,
199,999✔
366
                    Operation::Insert {
199,999✔
367
                        new: Record::new(vec![
199,999✔
368
                            Field::String(format!("key_{n}")),
199,999✔
369
                            Field::String(format!("value_{n}")),
199,999✔
370
                        ]),
199,999✔
371
                    },
199,999✔
372
                ),
199,999✔
373
                GENERATOR_SOURCE_OUTPUT_PORT,
199,999✔
374
            )?;
199,999✔
375
        }
×
376
        Ok(())
×
377
    }
1✔
378
}
×
379

×
380
#[test]
1✔
381
fn test_run_dag_src_err() {
1✔
382
    let count: u64 = 1_000_000;
1✔
383

1✔
384
    let mut dag = Dag::new();
1✔
385
    let latch = Arc::new(AtomicBool::new(true));
1✔
386

1✔
387
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
388
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
389
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
390

1✔
391
    dag.add_source(
1✔
392
        source_handle.clone(),
1✔
393
        Box::new(ErrGeneratorSourceFactory::new(count, 200_000)),
1✔
394
    );
1✔
395
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
396
    dag.add_sink(
1✔
397
        sink_handle.clone(),
1✔
398
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
399
    );
1✔
400

1✔
401
    dag.connect(
1✔
402
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
403
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
404
    )
1✔
405
    .unwrap();
1✔
406

1✔
407
    dag.connect(
1✔
408
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
409
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
410
    )
1✔
411
    .unwrap();
1✔
412

1✔
413
    let _join_handle = DagExecutor::new(dag, ExecutorOptions::default())
1✔
414
        .unwrap()
1✔
415
        .start(Arc::new(AtomicBool::new(true)))
1✔
416
        .unwrap();
1✔
417
    // join_handle.join().unwrap();
1✔
418
}
1✔
419

×
420
#[derive(Debug)]
×
421
pub(crate) struct ErrSinkFactory {
×
422
    err_at: u64,
×
423
    panic: bool,
×
424
}
425

426
impl ErrSinkFactory {
427
    pub fn new(err_at: u64, panic: bool) -> Self {
2✔
428
        Self { err_at, panic }
2✔
429
    }
2✔
430
}
431

432
impl SinkFactory<NoneContext> for ErrSinkFactory {
433
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
434
        vec![COUNTING_SINK_INPUT_PORT]
8✔
435
    }
8✔
436

×
437
    fn prepare(
2✔
438
        &self,
2✔
439
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
2✔
440
    ) -> Result<(), BoxedError> {
2✔
441
        Ok(())
2✔
442
    }
2✔
443

444
    fn build(
2✔
445
        &self,
2✔
446
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
447
    ) -> Result<Box<dyn Sink>, BoxedError> {
2✔
448
        Ok(Box::new(ErrSink {
2✔
449
            err_at: self.err_at,
2✔
450
            current: 0,
2✔
451
            panic: self.panic,
2✔
452
        }))
2✔
453
    }
2✔
454
}
×
455

×
456
#[derive(Debug)]
×
457
pub(crate) struct ErrSink {
×
458
    err_at: u64,
×
459
    current: u64,
×
460
    panic: bool,
×
461
}
462
impl Sink for ErrSink {
463
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
78✔
464
        Ok(())
78✔
465
    }
78✔
466

467
    fn process(
400,000✔
468
        &mut self,
400,000✔
469
        _from_port: PortHandle,
400,000✔
470
        _record_store: &ProcessorRecordStore,
400,000✔
471
        _op: ProcessorOperation,
400,000✔
472
    ) -> Result<(), BoxedError> {
400,000✔
473
        self.current += 1;
400,000✔
474
        if self.current == self.err_at {
400,000✔
475
            if self.panic {
2✔
476
                panic!("Generated error");
1✔
477
            } else {
×
478
                return Err("Generated error".to_string().into());
1✔
479
            }
×
480
        }
399,998✔
481
        Ok(())
399,998✔
482
    }
399,999✔
483

×
484
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
485
        Ok(())
×
486
    }
×
487
}
×
488

×
489
#[test]
1✔
490
#[should_panic]
491
fn test_run_dag_sink_err() {
1✔
492
    let count: u64 = 1_000_000;
1✔
493

1✔
494
    let mut dag = Dag::new();
1✔
495
    let latch = Arc::new(AtomicBool::new(true));
1✔
496

1✔
497
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
498
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
499
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
500

1✔
501
    dag.add_source(
1✔
502
        source_handle.clone(),
1✔
503
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
504
    );
1✔
505
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
506
    dag.add_sink(
1✔
507
        sink_handle.clone(),
1✔
508
        Box::new(ErrSinkFactory::new(200_000, false)),
1✔
509
    );
1✔
510

1✔
511
    dag.connect(
1✔
512
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
513
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
514
    )
1✔
515
    .unwrap();
1✔
516

1✔
517
    dag.connect(
1✔
518
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
519
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
520
    )
1✔
521
    .unwrap();
1✔
522

1✔
523
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
524
        .unwrap()
1✔
525
        .start(Arc::new(AtomicBool::new(true)))
1✔
526
        .unwrap()
1✔
527
        .join()
1✔
528
        .unwrap();
1✔
529
}
1✔
530

×
531
#[test]
1✔
532
#[should_panic]
×
533
fn test_run_dag_sink_err_panic() {
1✔
534
    let count: u64 = 1_000_000;
1✔
535

1✔
536
    let mut dag = Dag::new();
1✔
537
    let latch = Arc::new(AtomicBool::new(true));
1✔
538

1✔
539
    let source_handle = NodeHandle::new(None, 1.to_string());
1✔
540
    let proc_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
541
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
542

1✔
543
    dag.add_source(
1✔
544
        source_handle.clone(),
1✔
545
        Box::new(GeneratorSourceFactory::new(count, latch, false)),
1✔
546
    );
1✔
547
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
548
    dag.add_sink(
1✔
549
        sink_handle.clone(),
1✔
550
        Box::new(ErrSinkFactory::new(200_000, true)),
1✔
551
    );
1✔
552

1✔
553
    dag.connect(
1✔
554
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
555
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
556
    )
1✔
557
    .unwrap();
1✔
558

1✔
559
    dag.connect(
1✔
560
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
561
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
562
    )
1✔
563
    .unwrap();
1✔
564

1✔
565
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
566
        .unwrap()
1✔
567
        .start(Arc::new(AtomicBool::new(true)))
1✔
568
        .unwrap()
1✔
569
        .join()
1✔
570
        .unwrap();
1✔
571
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc