• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.64
/dozer-core/src/dag/tests/dag_recordreader.rs
1
use crate::dag::channels::ProcessorChannelForwarder;
2
use crate::dag::errors::ExecutionError;
3
use crate::dag::executor::{DagExecutor, ExecutorOptions};
4
use crate::dag::node::{
5
    NodeHandle, OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory,
6
};
7
use crate::dag::record_store::RecordReader;
8
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
9
use crate::dag::tests::sources::{
10
    GeneratorSourceFactory, NoPkGeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT,
11
};
12
use crate::dag::{Dag, Endpoint};
13
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
14
use dozer_types::types::{Field, Operation, Schema};
15

16
use std::collections::HashMap;
17

18
use std::sync::atomic::AtomicBool;
19
use std::sync::Arc;
20
use std::time::Duration;
21

22
use crate::dag::epoch::Epoch;
23
use crate::dag::tests::app::NoneContext;
24
use tempdir::TempDir;
25

26
macro_rules! chk {
27
    ($stmt:expr) => {
28
        $stmt.unwrap_or_else(|e| panic!("{}", e.to_string()))
29
    };
30
}
31

32
pub(crate) const PASSTHROUGH_PROCESSOR_INPUT_PORT: PortHandle = 50;
33
pub(crate) const PASSTHROUGH_PROCESSOR_OUTPUT_PORT: PortHandle = 60;
34

35
#[derive(Debug)]
×
36
pub(crate) struct PassthroughProcessorFactory {}
×
37

38
impl PassthroughProcessorFactory {
39
    pub fn new() -> Self {
1✔
40
        Self {}
1✔
41
    }
1✔
42
}
×
43

44
impl ProcessorFactory<NoneContext> for PassthroughProcessorFactory {
45
    fn get_output_schema(
1✔
46
        &self,
1✔
47
        _output_port: &PortHandle,
1✔
48
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
1✔
49
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
50
        Ok(input_schemas
1✔
51
            .get(&PASSTHROUGH_PROCESSOR_INPUT_PORT)
1✔
52
            .unwrap()
1✔
53
            .clone())
1✔
54
    }
1✔
55

×
56
    fn get_input_ports(&self) -> Vec<PortHandle> {
2✔
57
        vec![PASSTHROUGH_PROCESSOR_INPUT_PORT]
2✔
58
    }
2✔
59
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
60
        vec![OutputPortDef::new(
3✔
61
            PASSTHROUGH_PROCESSOR_OUTPUT_PORT,
3✔
62
            OutputPortType::StatefulWithPrimaryKeyLookup {
3✔
63
                retr_old_records_for_deletes: true,
3✔
64
                retr_old_records_for_updates: true,
3✔
65
            },
3✔
66
        )]
3✔
67
    }
3✔
68

×
69
    fn prepare(
×
70
        &self,
×
71
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
72
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
73
    ) -> Result<(), ExecutionError> {
×
74
        Ok(())
×
75
    }
×
76

×
77
    fn build(
1✔
78
        &self,
1✔
79
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
80
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
81
    ) -> Result<Box<dyn Processor>, ExecutionError> {
1✔
82
        Ok(Box::new(PassthroughProcessor {}))
1✔
83
    }
1✔
84
}
×
85

86
#[derive(Debug)]
×
87
pub(crate) struct PassthroughProcessor {}
×
88

89
impl Processor for PassthroughProcessor {
90
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
1✔
91
        Ok(())
1✔
92
    }
1✔
93

×
94
    fn commit(
19✔
95
        &self,
19✔
96
        _epoch_details: &Epoch,
19✔
97
        _tx: &SharedTransaction,
19✔
98
    ) -> Result<(), ExecutionError> {
19✔
99
        Ok(())
19✔
100
    }
19✔
101

×
102
    fn process(
10,000✔
103
        &mut self,
10,000✔
104
        _from_port: PortHandle,
10,000✔
105
        op: Operation,
10,000✔
106
        fw: &mut dyn ProcessorChannelForwarder,
10,000✔
107
        _tx: &SharedTransaction,
10,000✔
108
        _readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
10,000✔
109
    ) -> Result<(), ExecutionError> {
10,000✔
110
        fw.send(op, PASSTHROUGH_PROCESSOR_OUTPUT_PORT)
10,000✔
111
    }
10,000✔
112
}
×
113

114
#[derive(Debug)]
×
115
pub(crate) struct RecordReaderProcessorFactory {}
×
116

117
impl RecordReaderProcessorFactory {
118
    pub fn new() -> Self {
2✔
119
        Self {}
2✔
120
    }
2✔
121
}
×
122

123
pub(crate) const RECORD_READER_PROCESSOR_INPUT_PORT: PortHandle = 70;
124
pub(crate) const RECORD_READER_PROCESSOR_OUTPUT_PORT: PortHandle = 80;
125

126
impl ProcessorFactory<NoneContext> for RecordReaderProcessorFactory {
127
    fn get_output_schema(
2✔
128
        &self,
2✔
129
        _output_port: &PortHandle,
2✔
130
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
131
    ) -> Result<(Schema, NoneContext), ExecutionError> {
2✔
132
        Ok(input_schemas
2✔
133
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
2✔
134
            .unwrap()
2✔
135
            .clone())
2✔
136
    }
2✔
137

×
138
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
139
        vec![RECORD_READER_PROCESSOR_INPUT_PORT]
4✔
140
    }
4✔
141
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
142
        vec![OutputPortDef::new(
6✔
143
            RECORD_READER_PROCESSOR_OUTPUT_PORT,
6✔
144
            OutputPortType::Stateless,
6✔
145
        )]
6✔
146
    }
6✔
147

×
148
    fn prepare(
×
149
        &self,
×
150
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
151
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
152
    ) -> Result<(), ExecutionError> {
×
153
        Ok(())
×
154
    }
×
155

×
156
    fn build(
2✔
157
        &self,
2✔
158
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
159
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
160
    ) -> Result<Box<dyn Processor>, ExecutionError> {
2✔
161
        Ok(Box::new(RecordReaderProcessor { ctr: 1 }))
2✔
162
    }
2✔
163
}
×
164

165
#[derive(Debug)]
×
166
pub(crate) struct RecordReaderProcessor {
×
167
    ctr: u64,
168
}
169

170
impl Processor for RecordReaderProcessor {
171
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
2✔
172
        Ok(())
2✔
173
    }
2✔
174

×
175
    fn commit(
20✔
176
        &self,
20✔
177
        _epoch_details: &Epoch,
20✔
178
        _tx: &SharedTransaction,
20✔
179
    ) -> Result<(), ExecutionError> {
20✔
180
        Ok(())
20✔
181
    }
20✔
182

×
183
    fn process(
11,000✔
184
        &mut self,
11,000✔
185
        _from_port: PortHandle,
11,000✔
186
        op: Operation,
11,000✔
187
        fw: &mut dyn ProcessorChannelForwarder,
11,000✔
188
        _tx: &SharedTransaction,
11,000✔
189
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
11,000✔
190
    ) -> Result<(), ExecutionError> {
11,000✔
191
        let v = readers
11,000✔
192
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
11,000✔
193
            .unwrap()
11,000✔
194
            .get(
11,000✔
195
                Field::String(format!("key_{}", self.ctr))
11,000✔
196
                    .encode()
11,000✔
197
                    .as_slice(),
11,000✔
198
                1,
11,000✔
199
            )?;
11,000✔
200
        assert!(v.is_some());
11,000✔
201
        self.ctr += 1;
11,000✔
202

11,000✔
203
        fw.send(op, RECORD_READER_PROCESSOR_OUTPUT_PORT)
11,000✔
204
    }
11,000✔
205
}
×
206

207
#[test]
1✔
208
fn test_run_dag_record_reader() {
1✔
209
    const TOT: u64 = 10_000;
1✔
210

1✔
211
    let sync = Arc::new(AtomicBool::new(true));
1✔
212

1✔
213
    let src = GeneratorSourceFactory::new(TOT, sync.clone(), false);
1✔
214
    let passthrough = PassthroughProcessorFactory::new();
1✔
215
    let record_reader = RecordReaderProcessorFactory::new();
1✔
216
    let sink = CountingSinkFactory::new(TOT, sync);
1✔
217

1✔
218
    let mut dag = Dag::new();
1✔
219

1✔
220
    let source_id: NodeHandle = NodeHandle::new(None, 1.to_string());
1✔
221
    let passthrough_id: NodeHandle = NodeHandle::new(Some(1), 1.to_string());
1✔
222
    let record_reader_id: NodeHandle = NodeHandle::new(Some(1), 2.to_string());
1✔
223
    let sink_id: NodeHandle = NodeHandle::new(Some(1), 3.to_string());
1✔
224

1✔
225
    dag.add_source(source_id.clone(), Arc::new(src));
1✔
226
    dag.add_processor(passthrough_id.clone(), Arc::new(passthrough));
1✔
227
    dag.add_processor(record_reader_id.clone(), Arc::new(record_reader));
1✔
228
    dag.add_sink(sink_id.clone(), Arc::new(sink));
1✔
229

1✔
230
    assert!(dag
1✔
231
        .connect(
1✔
232
            Endpoint::new(source_id, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
233
            Endpoint::new(passthrough_id.clone(), PASSTHROUGH_PROCESSOR_INPUT_PORT),
1✔
234
        )
1✔
235
        .is_ok());
1✔
236

×
237
    assert!(dag
1✔
238
        .connect(
1✔
239
            Endpoint::new(passthrough_id, PASSTHROUGH_PROCESSOR_OUTPUT_PORT),
1✔
240
            Endpoint::new(record_reader_id.clone(), RECORD_READER_PROCESSOR_INPUT_PORT),
1✔
241
        )
1✔
242
        .is_ok());
1✔
243

244
    assert!(dag
1✔
245
        .connect(
1✔
246
            Endpoint::new(record_reader_id, RECORD_READER_PROCESSOR_OUTPUT_PORT),
1✔
247
            Endpoint::new(sink_id, COUNTING_SINK_INPUT_PORT),
1✔
248
        )
1✔
249
        .is_ok());
1✔
250

251
    let options = ExecutorOptions {
1✔
252
        commit_sz: 1000,
1✔
253
        commit_time_threshold: Duration::from_millis(5),
1✔
254
        ..Default::default()
1✔
255
    };
1✔
256

1✔
257
    let tmp_dir = chk!(TempDir::new("test"));
1✔
258
    let mut executor = chk!(DagExecutor::new(
1✔
259
        dag,
×
260
        tmp_dir.path(),
×
261
        options,
×
262
        Arc::new(AtomicBool::new(true))
×
263
    ));
×
264

×
265
    chk!(executor.start());
×
266
    assert!(executor.join().is_ok());
1✔
267
}
1✔
268

×
269
#[test]
1✔
270
fn test_run_dag_record_reader_from_src() {
1✔
271
    const TOT: u64 = 1_000;
1✔
272

1✔
273
    let sync = Arc::new(AtomicBool::new(true));
1✔
274

1✔
275
    let src = GeneratorSourceFactory::new(TOT, sync.clone(), true);
1✔
276
    let record_reader = RecordReaderProcessorFactory::new();
1✔
277
    let sink = CountingSinkFactory::new(TOT, sync);
1✔
278

1✔
279
    let mut dag = Dag::new();
1✔
280

1✔
281
    let source_id: NodeHandle = NodeHandle::new(None, 1.to_string());
1✔
282
    let record_reader_id: NodeHandle = NodeHandle::new(Some(1), 1.to_string());
1✔
283
    let sink_id: NodeHandle = NodeHandle::new(Some(1), 2.to_string());
1✔
284

1✔
285
    dag.add_source(source_id.clone(), Arc::new(src));
1✔
286
    dag.add_processor(record_reader_id.clone(), Arc::new(record_reader));
1✔
287
    dag.add_sink(sink_id.clone(), Arc::new(sink));
1✔
288

1✔
289
    assert!(dag
1✔
290
        .connect(
1✔
291
            Endpoint::new(source_id, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
292
            Endpoint::new(record_reader_id.clone(), RECORD_READER_PROCESSOR_INPUT_PORT),
1✔
293
        )
1✔
294
        .is_ok());
1✔
295

×
296
    assert!(dag
1✔
297
        .connect(
1✔
298
            Endpoint::new(record_reader_id, RECORD_READER_PROCESSOR_OUTPUT_PORT),
1✔
299
            Endpoint::new(sink_id, COUNTING_SINK_INPUT_PORT),
1✔
300
        )
1✔
301
        .is_ok());
1✔
302

×
303
    let tmp_dir = chk!(TempDir::new("test"));
1✔
304
    let mut executor = chk!(DagExecutor::new(
1✔
305
        dag,
×
306
        tmp_dir.path(),
×
307
        ExecutorOptions::default(),
×
308
        Arc::new(AtomicBool::new(true))
×
309
    ));
×
310

×
311
    chk!(executor.start());
×
312
    assert!(executor.join().is_ok());
1✔
313
}
1✔
314

×
315
#[derive(Debug)]
×
316
pub(crate) struct NoPkRecordReaderProcessorFactory {}
×
317

×
318
impl NoPkRecordReaderProcessorFactory {
×
319
    pub fn new() -> Self {
1✔
320
        Self {}
1✔
321
    }
1✔
322
}
×
323

×
324
impl ProcessorFactory<NoneContext> for NoPkRecordReaderProcessorFactory {
325
    fn get_output_schema(
1✔
326
        &self,
1✔
327
        _output_port: &PortHandle,
1✔
328
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
1✔
329
    ) -> Result<(Schema, NoneContext), ExecutionError> {
1✔
330
        Ok(input_schemas
1✔
331
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
1✔
332
            .unwrap()
1✔
333
            .clone())
1✔
334
    }
1✔
335

×
336
    fn get_input_ports(&self) -> Vec<PortHandle> {
2✔
337
        vec![RECORD_READER_PROCESSOR_INPUT_PORT]
2✔
338
    }
2✔
339
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
340
        vec![OutputPortDef::new(
3✔
341
            RECORD_READER_PROCESSOR_OUTPUT_PORT,
3✔
342
            OutputPortType::Stateless,
3✔
343
        )]
3✔
344
    }
3✔
345

×
346
    fn prepare(
×
347
        &self,
×
348
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
349
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
350
    ) -> Result<(), ExecutionError> {
×
351
        Ok(())
×
352
    }
×
353

×
354
    fn build(
1✔
355
        &self,
1✔
356
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
357
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
358
    ) -> Result<Box<dyn Processor>, ExecutionError> {
1✔
359
        Ok(Box::new(NoPkRecordReaderProcessor { ctr: 1 }))
1✔
360
    }
1✔
361
}
×
362

×
363
#[derive(Debug)]
×
364
pub(crate) struct NoPkRecordReaderProcessor {
×
365
    ctr: u64,
×
366
}
×
367

×
368
impl Processor for NoPkRecordReaderProcessor {
×
369
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
1✔
370
        Ok(())
1✔
371
    }
1✔
372

×
373
    fn commit(
1✔
374
        &self,
1✔
375
        _epoch_details: &Epoch,
1✔
376
        _tx: &SharedTransaction,
1✔
377
    ) -> Result<(), ExecutionError> {
1✔
378
        Ok(())
1✔
379
    }
1✔
380

×
381
    fn process(
1,000✔
382
        &mut self,
1,000✔
383
        _from_port: PortHandle,
1,000✔
384
        op: Operation,
1,000✔
385
        fw: &mut dyn ProcessorChannelForwarder,
1,000✔
386
        _tx: &SharedTransaction,
1,000✔
387
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
1,000✔
388
    ) -> Result<(), ExecutionError> {
1,000✔
389
        let v = readers
1,000✔
390
            .get(&RECORD_READER_PROCESSOR_INPUT_PORT)
1,000✔
391
            .unwrap()
1,000✔
392
            .get(Field::UInt(self.ctr).encode().as_slice(), 1)?;
1,000✔
393
        assert!(v.is_some());
1,000✔
394
        self.ctr += 1;
1,000✔
395

1,000✔
396
        fw.send(op, RECORD_READER_PROCESSOR_OUTPUT_PORT)
1,000✔
397
    }
1,000✔
398
}
×
399

×
400
#[test]
1✔
401
fn test_run_dag_record_reader_from_rowkey_autogen_src() {
1✔
402
    const TOT: u64 = 1_000;
1✔
403

1✔
404
    let sync = Arc::new(AtomicBool::new(true));
1✔
405

1✔
406
    let src = NoPkGeneratorSourceFactory::new(TOT, sync.clone(), true);
1✔
407
    let record_reader = NoPkRecordReaderProcessorFactory::new();
1✔
408
    let sink = CountingSinkFactory::new(TOT, sync);
1✔
409

1✔
410
    let mut dag = Dag::new();
1✔
411

1✔
412
    let source_id: NodeHandle = NodeHandle::new(None, 1.to_string());
1✔
413
    let record_reader_id: NodeHandle = NodeHandle::new(Some(1), 1.to_string());
1✔
414
    let sink_id: NodeHandle = NodeHandle::new(Some(1), 2.to_string());
1✔
415

1✔
416
    dag.add_source(source_id.clone(), Arc::new(src));
1✔
417
    dag.add_processor(record_reader_id.clone(), Arc::new(record_reader));
1✔
418
    dag.add_sink(sink_id.clone(), Arc::new(sink));
1✔
419

1✔
420
    assert!(dag
1✔
421
        .connect(
1✔
422
            Endpoint::new(source_id, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
423
            Endpoint::new(record_reader_id.clone(), RECORD_READER_PROCESSOR_INPUT_PORT),
1✔
424
        )
1✔
425
        .is_ok());
1✔
426

×
427
    assert!(dag
1✔
428
        .connect(
1✔
429
            Endpoint::new(record_reader_id, RECORD_READER_PROCESSOR_OUTPUT_PORT),
1✔
430
            Endpoint::new(sink_id, COUNTING_SINK_INPUT_PORT),
1✔
431
        )
1✔
432
        .is_ok());
1✔
433

×
434
    let tmp_dir = chk!(TempDir::new("test"));
1✔
435
    let mut executor = chk!(DagExecutor::new(
1✔
436
        dag,
×
437
        tmp_dir.path(),
×
438
        ExecutorOptions::default(),
×
439
        Arc::new(AtomicBool::new(true))
×
440
    ));
×
441

×
442
    chk!(executor.start());
×
443
    assert!(executor.join().is_ok());
1✔
444
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc