• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4284179721

pending completion
4284179721

push

github

GitHub
fix: select * wildcard (#1080)

27683 of 39180 relevant lines covered (70.66%)

52493.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.19
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSource, AppSourceManager};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::executor::{DagExecutor, ExecutorOptions};
6
use dozer_core::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
8
};
9
use dozer_core::record_store::RecordReader;
10
use dozer_core::storage::lmdb_storage::SharedTransaction;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::log::debug;
13
use dozer_types::node::SourceStates;
14
use dozer_types::ordered_float::OrderedFloat;
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use dozer_core::epoch::Epoch;
20

21
use std::collections::HashMap;
22
use std::fs;
23

24
use std::sync::atomic::AtomicBool;
25
use std::sync::Arc;
26
use tempdir::TempDir;
27

28
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
29

30
/// Test Source
×
31
#[derive(Debug)]
×
32
pub struct TestSourceFactory {
33
    output_ports: Vec<PortHandle>,
34
}
35

36
impl TestSourceFactory {
×
37
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
38
        Self { output_ports }
1✔
39
    }
1✔
40
}
41

42
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
×
43
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
44
        Ok(self
3✔
45
            .output_ports
3✔
46
            .iter()
3✔
47
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
48
            .collect())
3✔
49
    }
3✔
50

×
51
    fn get_output_schema(
1✔
52
        &self,
1✔
53
        _port: &PortHandle,
1✔
54
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
1✔
55
        Ok((
1✔
56
            Schema::empty()
1✔
57
                .field(
1✔
58
                    FieldDefinition::new(
1✔
59
                        String::from("CustomerID"),
1✔
60
                        FieldType::Int,
1✔
61
                        false,
1✔
62
                        SourceDefinition::Dynamic,
1✔
63
                    ),
1✔
64
                    false,
1✔
65
                )
1✔
66
                .field(
1✔
67
                    FieldDefinition::new(
1✔
68
                        String::from("Country"),
1✔
69
                        FieldType::String,
1✔
70
                        false,
1✔
71
                        SourceDefinition::Dynamic,
1✔
72
                    ),
1✔
73
                    false,
1✔
74
                )
1✔
75
                .field(
1✔
76
                    FieldDefinition::new(
1✔
77
                        String::from("Spending"),
1✔
78
                        FieldType::Float,
1✔
79
                        false,
1✔
80
                        SourceDefinition::Dynamic,
1✔
81
                    ),
1✔
82
                    false,
1✔
83
                )
1✔
84
                .clone(),
1✔
85
            SchemaSQLContext::default(),
1✔
86
        ))
1✔
87
    }
1✔
88

×
89
    fn build(
1✔
90
        &self,
1✔
91
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
92
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
93
        Ok(Box::new(TestSource {}))
1✔
94
    }
1✔
95
}
96

×
97
#[derive(Debug)]
×
98
pub struct TestSource {}
99

100
impl Source for TestSource {
×
101
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError> {
×
102
        Ok(false)
×
103
    }
×
104

×
105
    fn start(
1✔
106
        &self,
1✔
107
        fw: &mut dyn SourceChannelForwarder,
1✔
108
        _last_checkpoint: Option<(u64, u64)>,
1✔
109
    ) -> Result<(), ExecutionError> {
1✔
110
        for n in 0..10000 {
10,001✔
111
            fw.send(
10,000✔
112
                n,
10,000✔
113
                0,
10,000✔
114
                Operation::Insert {
10,000✔
115
                    new: Record::new(
10,000✔
116
                        None,
10,000✔
117
                        vec![
10,000✔
118
                            Field::Int(0),
10,000✔
119
                            Field::String("Italy".to_string()),
10,000✔
120
                            Field::Float(OrderedFloat(5.5)),
10,000✔
121
                        ],
10,000✔
122
                        None,
10,000✔
123
                    ),
10,000✔
124
                },
10,000✔
125
                DEFAULT_PORT_HANDLE,
10,000✔
126
            )
10,000✔
127
            .unwrap();
10,000✔
128
        }
10,000✔
129
        Ok(())
1✔
130
    }
1✔
131
}
132

×
133
#[derive(Debug)]
×
134
pub struct TestSinkFactory {
135
    input_ports: Vec<PortHandle>,
136
}
137

138
impl TestSinkFactory {
×
139
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
140
        Self { input_ports }
1✔
141
    }
1✔
142
}
143

144
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
145
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
146
        self.input_ports.clone()
4✔
147
    }
4✔
148

×
149
    fn build(
1✔
150
        &self,
1✔
151
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
152
        _source_states: &SourceStates,
1✔
153
    ) -> Result<Box<dyn Sink>, ExecutionError> {
1✔
154
        Ok(Box::new(TestSink {}))
1✔
155
    }
1✔
156

×
157
    fn prepare(
1✔
158
        &self,
1✔
159
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
160
    ) -> Result<(), ExecutionError> {
1✔
161
        Ok(())
1✔
162
    }
1✔
163
}
×
164

165
#[derive(Debug)]
×
166
pub struct TestSink {}
167

×
168
impl Sink for TestSink {
×
169
    fn process(
10,000✔
170
        &mut self,
10,000✔
171
        _from_port: PortHandle,
10,000✔
172
        _op: Operation,
10,000✔
173
        _state: &SharedTransaction,
10,000✔
174
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
10,000✔
175
    ) -> Result<(), ExecutionError> {
10,000✔
176
        Ok(())
10,000✔
177
    }
10,000✔
178

×
179
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
1✔
180
        Ok(())
1✔
181
    }
1✔
182
}
×
183

×
184
#[test]
1✔
185
fn test_pipeline_builder() {
1✔
186
    let mut pipeline = AppPipeline::new();
1✔
187
    let context = statement_to_pipeline(
1✔
188
        "SELECT COUNT(Spending), users.Country \
1✔
189
        FROM users \
1✔
190
         WHERE Spending >= 1",
1✔
191
        &mut pipeline,
1✔
192
        Some("results".to_string()),
1✔
193
    )
1✔
194
    .unwrap();
1✔
195

1✔
196
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
197

1✔
198
    let mut asm = AppSourceManager::new();
1✔
199
    asm.add(AppSource::new(
1✔
200
        "mem".to_string(),
1✔
201
        Arc::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
202
        vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
203
            .into_iter()
1✔
204
            .collect(),
1✔
205
    ))
1✔
206
    .unwrap();
1✔
207

1✔
208
    pipeline.add_sink(
1✔
209
        Arc::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
210
        "sink",
1✔
211
    );
1✔
212
    pipeline
1✔
213
        .connect_nodes(
1✔
214
            &table_info.node,
1✔
215
            Some(table_info.port),
1✔
216
            "sink",
1✔
217
            Some(DEFAULT_PORT_HANDLE),
1✔
218
            true,
1✔
219
        )
1✔
220
        .unwrap();
1✔
221

1✔
222
    let mut app = App::new(asm);
1✔
223
    app.add_pipeline(pipeline);
1✔
224

1✔
225
    let dag = app.get_dag().unwrap();
1✔
226

1✔
227
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
228
    if tmp_dir.path().exists() {
1✔
229
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
1✔
230
    }
1✔
231
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
232

1✔
233
    use std::time::Instant;
1✔
234
    let now = Instant::now();
1✔
235

1✔
236
    let tmp_dir = TempDir::new("test").unwrap();
1✔
237
    DagExecutor::new(
1✔
238
        &dag,
1✔
239
        tmp_dir.path().to_path_buf(),
1✔
240
        ExecutorOptions::default(),
1✔
241
    )
1✔
242
    .unwrap()
1✔
243
    .start(Arc::new(AtomicBool::new(true)))
1✔
244
    .unwrap()
1✔
245
    .join()
1✔
246
    .unwrap();
1✔
247

1✔
248
    let elapsed = now.elapsed();
1✔
249
    debug!("Elapsed: {:.2?}", elapsed);
1✔
250
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc