• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4302087115

pending completion
4302087115

push

github

GitHub
chore: Move `SnapshottingDone` out of `Operation` so processors don't have to know it.(#1103)

364 of 364 new or added lines in 33 files covered. (100.0%)

28623 of 40224 relevant lines covered (71.16%)

56785.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.43
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSource, AppSourceManager};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::executor::{DagExecutor, ExecutorOptions};
6
use dozer_core::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
8
};
9
use dozer_core::record_store::RecordReader;
10
use dozer_core::storage::lmdb_storage::SharedTransaction;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::ingestion_types::IngestionMessage;
13
use dozer_types::log::debug;
14
use dozer_types::node::SourceStates;
15
use dozer_types::ordered_float::OrderedFloat;
16
use dozer_types::types::{
17
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
18
};
19

20
use dozer_core::epoch::Epoch;
21

22
use std::collections::HashMap;
23
use std::fs;
24

25
use std::sync::atomic::AtomicBool;
26
use std::sync::Arc;
27
use tempdir::TempDir;
28

29
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
30

31
/// Test Source
×
32
#[derive(Debug)]
×
33
pub struct TestSourceFactory {
34
    output_ports: Vec<PortHandle>,
35
}
36

37
impl TestSourceFactory {
×
38
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
39
        Self { output_ports }
1✔
40
    }
1✔
41
}
42

43
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
×
44
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
45
        Ok(self
3✔
46
            .output_ports
3✔
47
            .iter()
3✔
48
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
49
            .collect())
3✔
50
    }
3✔
51

×
52
    fn get_output_schema(
1✔
53
        &self,
1✔
54
        _port: &PortHandle,
1✔
55
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
1✔
56
        Ok((
1✔
57
            Schema::empty()
1✔
58
                .field(
1✔
59
                    FieldDefinition::new(
1✔
60
                        String::from("CustomerID"),
1✔
61
                        FieldType::Int,
1✔
62
                        false,
1✔
63
                        SourceDefinition::Dynamic,
1✔
64
                    ),
1✔
65
                    false,
1✔
66
                )
1✔
67
                .field(
1✔
68
                    FieldDefinition::new(
1✔
69
                        String::from("Country"),
1✔
70
                        FieldType::String,
1✔
71
                        false,
1✔
72
                        SourceDefinition::Dynamic,
1✔
73
                    ),
1✔
74
                    false,
1✔
75
                )
1✔
76
                .field(
1✔
77
                    FieldDefinition::new(
1✔
78
                        String::from("Spending"),
1✔
79
                        FieldType::Float,
1✔
80
                        false,
1✔
81
                        SourceDefinition::Dynamic,
1✔
82
                    ),
1✔
83
                    false,
1✔
84
                )
1✔
85
                .clone(),
1✔
86
            SchemaSQLContext::default(),
1✔
87
        ))
1✔
88
    }
1✔
89

×
90
    fn build(
1✔
91
        &self,
1✔
92
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
93
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
94
        Ok(Box::new(TestSource {}))
1✔
95
    }
1✔
96
}
97

×
98
#[derive(Debug)]
×
99
pub struct TestSource {}
100

101
impl Source for TestSource {
×
102
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError> {
×
103
        Ok(false)
×
104
    }
×
105

×
106
    fn start(
1✔
107
        &self,
1✔
108
        fw: &mut dyn SourceChannelForwarder,
1✔
109
        _last_checkpoint: Option<(u64, u64)>,
1✔
110
    ) -> Result<(), ExecutionError> {
1✔
111
        for n in 0..10000 {
10,001✔
112
            fw.send(
10,000✔
113
                IngestionMessage::new_op(
10,000✔
114
                    n,
10,000✔
115
                    0,
10,000✔
116
                    Operation::Insert {
10,000✔
117
                        new: Record::new(
10,000✔
118
                            None,
10,000✔
119
                            vec![
10,000✔
120
                                Field::Int(0),
10,000✔
121
                                Field::String("Italy".to_string()),
10,000✔
122
                                Field::Float(OrderedFloat(5.5)),
10,000✔
123
                            ],
10,000✔
124
                            None,
10,000✔
125
                        ),
10,000✔
126
                    },
10,000✔
127
                ),
10,000✔
128
                DEFAULT_PORT_HANDLE,
10,000✔
129
            )
10,000✔
130
            .unwrap();
10,000✔
131
        }
10,000✔
132
        Ok(())
1✔
133
    }
1✔
134
}
135

136
#[derive(Debug)]
×
137
pub struct TestSinkFactory {
138
    input_ports: Vec<PortHandle>,
139
}
×
140

×
141
impl TestSinkFactory {
×
142
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
143
        Self { input_ports }
1✔
144
    }
1✔
145
}
×
146

×
147
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
148
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
149
        self.input_ports.clone()
4✔
150
    }
4✔
151

×
152
    fn build(
1✔
153
        &self,
1✔
154
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
155
        _source_states: &SourceStates,
1✔
156
    ) -> Result<Box<dyn Sink>, ExecutionError> {
1✔
157
        Ok(Box::new(TestSink {}))
1✔
158
    }
1✔
159

×
160
    fn prepare(
1✔
161
        &self,
1✔
162
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
163
    ) -> Result<(), ExecutionError> {
1✔
164
        Ok(())
1✔
165
    }
1✔
166
}
167

168
#[derive(Debug)]
×
169
pub struct TestSink {}
×
170

×
171
impl Sink for TestSink {
×
172
    fn process(
10,000✔
173
        &mut self,
10,000✔
174
        _from_port: PortHandle,
10,000✔
175
        _op: Operation,
10,000✔
176
        _state: &SharedTransaction,
10,000✔
177
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
10,000✔
178
    ) -> Result<(), ExecutionError> {
10,000✔
179
        Ok(())
10,000✔
180
    }
10,000✔
181

×
182
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
1✔
183
        Ok(())
1✔
184
    }
1✔
185

×
186
    fn on_source_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
187
        Ok(())
×
188
    }
×
189
}
×
190

×
191
#[test]
1✔
192
fn test_pipeline_builder() {
1✔
193
    let mut pipeline = AppPipeline::new();
1✔
194
    let context = statement_to_pipeline(
1✔
195
        "SELECT COUNT(Spending), users.Country \
1✔
196
        FROM users \
1✔
197
         WHERE Spending >= 1",
1✔
198
        &mut pipeline,
1✔
199
        Some("results".to_string()),
1✔
200
    )
1✔
201
    .unwrap();
1✔
202

1✔
203
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
204

1✔
205
    let mut asm = AppSourceManager::new();
1✔
206
    asm.add(AppSource::new(
1✔
207
        "mem".to_string(),
1✔
208
        Arc::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
209
        vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
210
            .into_iter()
1✔
211
            .collect(),
1✔
212
    ))
1✔
213
    .unwrap();
1✔
214

1✔
215
    pipeline.add_sink(
1✔
216
        Arc::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
217
        "sink",
1✔
218
    );
1✔
219
    pipeline
1✔
220
        .connect_nodes(
1✔
221
            &table_info.node,
1✔
222
            Some(table_info.port),
1✔
223
            "sink",
1✔
224
            Some(DEFAULT_PORT_HANDLE),
1✔
225
            true,
1✔
226
        )
1✔
227
        .unwrap();
1✔
228

1✔
229
    let mut app = App::new(asm);
1✔
230
    app.add_pipeline(pipeline);
1✔
231

1✔
232
    let dag = app.get_dag().unwrap();
1✔
233

1✔
234
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
235
    if tmp_dir.path().exists() {
1✔
236
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
1✔
237
    }
1✔
238
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
239

1✔
240
    use std::time::Instant;
1✔
241
    let now = Instant::now();
1✔
242

1✔
243
    let tmp_dir = TempDir::new("test").unwrap();
1✔
244
    DagExecutor::new(
1✔
245
        &dag,
1✔
246
        tmp_dir.path().to_path_buf(),
1✔
247
        ExecutorOptions::default(),
1✔
248
    )
1✔
249
    .unwrap()
1✔
250
    .start(Arc::new(AtomicBool::new(true)))
1✔
251
    .unwrap()
1✔
252
    .join()
1✔
253
    .unwrap();
1✔
254

1✔
255
    let elapsed = now.elapsed();
1✔
256
    debug!("Elapsed: {:.2?}", elapsed);
1✔
257
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc