• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965135367

pending completion
3965135367

Pull #680

github

GitHub
Merge 1add77327 into 56c0cf2b3
Pull Request #680: feat: Implement nested queries and CTE.

506 of 506 new or added lines in 18 files covered. (100.0%)

21999 of 33062 relevant lines covered (66.54%)

50489.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.42
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::dag::app::App;
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
7
use dozer_core::dag::node::{
8
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
9
};
10
use dozer_core::dag::record_store::RecordReader;
11
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
12
use dozer_types::log::debug;
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::types::{Field, FieldDefinition, FieldType, Operation, Record, Schema};
15

16
use dozer_core::dag::epoch::Epoch;
17

18
use std::collections::HashMap;
19
use std::fs;
20

21
use std::sync::atomic::AtomicBool;
22
use std::sync::Arc;
23
use tempdir::TempDir;
24

25
use crate::pipeline::builder::statement_to_pipeline;
26

27
/// Test Source
×
28
#[derive(Debug)]
×
29
pub struct TestSourceFactory {
30
    output_ports: Vec<PortHandle>,
31
}
32

33
impl TestSourceFactory {
×
34
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
35
        Self { output_ports }
1✔
36
    }
1✔
37
}
38

39
impl SourceFactory for TestSourceFactory {
×
40
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
41
        Ok(self
3✔
42
            .output_ports
3✔
43
            .iter()
3✔
44
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
45
            .collect())
3✔
46
    }
3✔
47

×
48
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, ExecutionError> {
1✔
49
        Ok(Schema::empty()
1✔
50
            .field(
1✔
51
                FieldDefinition::new(String::from("CustomerID"), FieldType::Int, false),
1✔
52
                false,
1✔
53
            )
1✔
54
            .field(
1✔
55
                FieldDefinition::new(String::from("Country"), FieldType::String, false),
1✔
56
                false,
1✔
57
            )
1✔
58
            .field(
1✔
59
                FieldDefinition::new(String::from("Spending"), FieldType::Float, false),
1✔
60
                false,
1✔
61
            )
1✔
62
            .clone())
1✔
63
    }
1✔
64

×
65
    fn build(
1✔
66
        &self,
1✔
67
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
68
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
69
        Ok(Box::new(TestSource {}))
1✔
70
    }
1✔
71

×
72
    fn prepare(&self, _output_schemas: HashMap<PortHandle, Schema>) -> Result<(), ExecutionError> {
×
73
        Ok(())
×
74
    }
×
75
}
76

×
77
#[derive(Debug)]
×
78
pub struct TestSource {}
79

80
impl Source for TestSource {
×
81
    fn start(
1✔
82
        &self,
1✔
83
        fw: &mut dyn SourceChannelForwarder,
1✔
84
        _from_seq: Option<(u64, u64)>,
1✔
85
    ) -> Result<(), ExecutionError> {
1✔
86
        for n in 0..10000 {
10,001✔
87
            fw.send(
10,000✔
88
                n,
10,000✔
89
                0,
10,000✔
90
                Operation::Insert {
10,000✔
91
                    new: Record::new(
10,000✔
92
                        None,
10,000✔
93
                        vec![
10,000✔
94
                            Field::Int(0),
10,000✔
95
                            Field::String("Italy".to_string()),
10,000✔
96
                            Field::Float(OrderedFloat(5.5)),
10,000✔
97
                        ],
10,000✔
98
                        None,
10,000✔
99
                    ),
10,000✔
100
                },
10,000✔
101
                DEFAULT_PORT_HANDLE,
10,000✔
102
            )
10,000✔
103
            .unwrap();
10,000✔
104
        }
10,000✔
105
        Ok(())
1✔
106
    }
1✔
107
}
108

×
109
#[derive(Debug)]
×
110
pub struct TestSinkFactory {
111
    input_ports: Vec<PortHandle>,
112
}
113

114
impl TestSinkFactory {
×
115
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
116
        Self { input_ports }
1✔
117
    }
1✔
118
}
119

120
impl SinkFactory for TestSinkFactory {
×
121
    fn get_input_ports(&self) -> Vec<PortHandle> {
2✔
122
        self.input_ports.clone()
2✔
123
    }
2✔
124

×
125
    fn set_input_schema(
1✔
126
        &self,
1✔
127
        _input_schemas: &HashMap<PortHandle, Schema>,
1✔
128
    ) -> Result<(), ExecutionError> {
1✔
129
        Ok(())
1✔
130
    }
1✔
131

×
132
    fn build(
1✔
133
        &self,
1✔
134
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
135
    ) -> Result<Box<dyn Sink>, ExecutionError> {
1✔
136
        Ok(Box::new(TestSink {}))
1✔
137
    }
1✔
138

×
139
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), ExecutionError> {
×
140
        Ok(())
×
141
    }
×
142
}
143

×
144
#[derive(Debug)]
×
145
pub struct TestSink {}
146

147
impl Sink for TestSink {
×
148
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
1✔
149
        debug!("SINK: Initialising TestSink");
1✔
150
        Ok(())
1✔
151
    }
1✔
152

×
153
    fn process(
10,000✔
154
        &mut self,
10,000✔
155
        _from_port: PortHandle,
10,000✔
156
        _op: Operation,
10,000✔
157
        _state: &SharedTransaction,
10,000✔
158
        _reader: &HashMap<PortHandle, RecordReader>,
10,000✔
159
    ) -> Result<(), ExecutionError> {
10,000✔
160
        Ok(())
10,000✔
161
    }
10,000✔
162

×
163
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
11✔
164
        Ok(())
11✔
165
    }
11✔
166
}
167

×
168
#[test]
1✔
169
fn test_pipeline_builder() {
1✔
170
    let (mut pipeline, (node, node_port)) = statement_to_pipeline(
1✔
171
        "SELECT COUNT(Spending), users.Country \
1✔
172
    FROM users \
1✔
173
    WHERE Spending >= 1",
1✔
174
    )
1✔
175
    .unwrap();
1✔
176

1✔
177
    let mut asm = AppSourceManager::new();
1✔
178
    asm.add(AppSource::new(
1✔
179
        "mem".to_string(),
1✔
180
        Arc::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
181
        vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
182
            .into_iter()
1✔
183
            .collect(),
1✔
184
    ))
1✔
185
    .unwrap();
1✔
186

1✔
187
    pipeline.add_sink(
1✔
188
        Arc::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
189
        "sink",
1✔
190
    );
1✔
191
    pipeline
1✔
192
        .connect_nodes(&node, Some(node_port), "sink", Some(DEFAULT_PORT_HANDLE))
1✔
193
        .unwrap();
1✔
194

1✔
195
    let mut app = App::new(asm);
1✔
196
    app.add_pipeline(pipeline);
1✔
197

1✔
198
    let dag = app.get_dag().unwrap();
1✔
199

1✔
200
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
201
    if tmp_dir.path().exists() {
1✔
202
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
1✔
203
    }
1✔
204
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
205

1✔
206
    use std::time::Instant;
1✔
207
    let now = Instant::now();
1✔
208

1✔
209
    let tmp_dir = TempDir::new("test").unwrap();
1✔
210
    let mut executor = DagExecutor::new(
1✔
211
        &dag,
1✔
212
        tmp_dir.path(),
1✔
213
        ExecutorOptions::default(),
1✔
214
        Arc::new(AtomicBool::new(true)),
1✔
215
    )
1✔
216
    .unwrap();
1✔
217

1✔
218
    executor
1✔
219
        .start()
1✔
220
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {}", e));
1✔
221
    assert!(executor.join().is_ok());
1✔
222

×
223
    let elapsed = now.elapsed();
1✔
224
    debug!("Elapsed: {:.2?}", elapsed);
1✔
225
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc