• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007820649

pending completion
4007820649

Pull #734

github

GitHub
Merge b71e66da1 into 6c0ac2b2c
Pull Request #734: Bump ahash from 0.8.2 to 0.8.3

23507 of 35166 relevant lines covered (66.85%)

40241.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.07
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::dag::app::App;
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
7
use dozer_core::dag::node::{
8
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
9
};
10
use dozer_core::dag::record_store::RecordReader;
11
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
12
use dozer_types::log::debug;
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::types::{
15
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
16
};
17

18
use dozer_core::dag::epoch::Epoch;
19

20
use std::collections::HashMap;
21
use std::fs;
22

23
use std::sync::atomic::AtomicBool;
24
use std::sync::Arc;
25
use tempdir::TempDir;
26

27
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
28

×
29
/// Test Source
30
#[derive(Debug)]
×
31
pub struct TestSourceFactory {
32
    output_ports: Vec<PortHandle>,
33
}
34

×
35
impl TestSourceFactory {
×
36
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
37
        Self { output_ports }
1✔
38
    }
1✔
39
}
40

×
41
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
×
42
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
3✔
43
        Ok(self
3✔
44
            .output_ports
3✔
45
            .iter()
3✔
46
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
47
            .collect())
3✔
48
    }
3✔
49

×
50
    fn get_output_schema(
1✔
51
        &self,
1✔
52
        _port: &PortHandle,
1✔
53
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
1✔
54
        Ok((
1✔
55
            Schema::empty()
1✔
56
                .field(
1✔
57
                    FieldDefinition::new(
1✔
58
                        String::from("CustomerID"),
1✔
59
                        FieldType::Int,
1✔
60
                        false,
1✔
61
                        SourceDefinition::Dynamic,
1✔
62
                    ),
1✔
63
                    false,
1✔
64
                )
1✔
65
                .field(
1✔
66
                    FieldDefinition::new(
1✔
67
                        String::from("Country"),
1✔
68
                        FieldType::String,
1✔
69
                        false,
1✔
70
                        SourceDefinition::Dynamic,
1✔
71
                    ),
1✔
72
                    false,
1✔
73
                )
1✔
74
                .field(
1✔
75
                    FieldDefinition::new(
1✔
76
                        String::from("Spending"),
1✔
77
                        FieldType::Float,
1✔
78
                        false,
1✔
79
                        SourceDefinition::Dynamic,
1✔
80
                    ),
1✔
81
                    false,
1✔
82
                )
1✔
83
                .clone(),
1✔
84
            SchemaSQLContext::default(),
1✔
85
        ))
1✔
86
    }
1✔
87

88
    fn build(
1✔
89
        &self,
1✔
90
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
91
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
92
        Ok(Box::new(TestSource {}))
1✔
93
    }
1✔
94

×
95
    fn prepare(
×
96
        &self,
×
97
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
98
    ) -> Result<(), ExecutionError> {
×
99
        Ok(())
×
100
    }
×
101
}
×
102

×
103
#[derive(Debug)]
×
104
pub struct TestSource {}
×
105

×
106
impl Source for TestSource {
×
107
    fn start(
1✔
108
        &self,
1✔
109
        fw: &mut dyn SourceChannelForwarder,
1✔
110
        _from_seq: Option<(u64, u64)>,
1✔
111
    ) -> Result<(), ExecutionError> {
1✔
112
        for n in 0..10000 {
10,001✔
113
            fw.send(
10,000✔
114
                n,
10,000✔
115
                0,
10,000✔
116
                Operation::Insert {
10,000✔
117
                    new: Record::new(
10,000✔
118
                        None,
10,000✔
119
                        vec![
10,000✔
120
                            Field::Int(0),
10,000✔
121
                            Field::String("Italy".to_string()),
10,000✔
122
                            Field::Float(OrderedFloat(5.5)),
10,000✔
123
                        ],
10,000✔
124
                        None,
10,000✔
125
                    ),
10,000✔
126
                },
10,000✔
127
                DEFAULT_PORT_HANDLE,
10,000✔
128
            )
10,000✔
129
            .unwrap();
10,000✔
130
        }
10,000✔
131
        Ok(())
1✔
132
    }
1✔
133
}
134

×
135
#[derive(Debug)]
×
136
pub struct TestSinkFactory {
×
137
    input_ports: Vec<PortHandle>,
×
138
}
×
139

×
140
impl TestSinkFactory {
141
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
142
        Self { input_ports }
1✔
143
    }
1✔
144
}
×
145

×
146
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
147
    fn get_input_ports(&self) -> Vec<PortHandle> {
2✔
148
        self.input_ports.clone()
2✔
149
    }
2✔
150

×
151
    fn set_input_schema(
1✔
152
        &self,
1✔
153
        _input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
154
    ) -> Result<(), ExecutionError> {
1✔
155
        Ok(())
1✔
156
    }
1✔
157

158
    fn build(
1✔
159
        &self,
1✔
160
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
161
    ) -> Result<Box<dyn Sink>, ExecutionError> {
1✔
162
        Ok(Box::new(TestSink {}))
1✔
163
    }
1✔
164

165
    fn prepare(
×
166
        &self,
×
167
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
168
    ) -> Result<(), ExecutionError> {
×
169
        Ok(())
×
170
    }
×
171
}
×
172

×
173
#[derive(Debug)]
×
174
pub struct TestSink {}
175

×
176
impl Sink for TestSink {
×
177
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
1✔
178
        debug!("SINK: Initialising TestSink");
1✔
179
        Ok(())
1✔
180
    }
1✔
181

×
182
    fn process(
10,000✔
183
        &mut self,
10,000✔
184
        _from_port: PortHandle,
10,000✔
185
        _op: Operation,
10,000✔
186
        _state: &SharedTransaction,
10,000✔
187
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
10,000✔
188
    ) -> Result<(), ExecutionError> {
10,000✔
189
        Ok(())
10,000✔
190
    }
10,000✔
191

×
192
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
7✔
193
        Ok(())
7✔
194
    }
7✔
195
}
×
196

×
197
#[test]
1✔
198
fn test_pipeline_builder() {
1✔
199
    let (mut pipeline, (node, node_port)) = statement_to_pipeline(
1✔
200
        "SELECT COUNT(Spending), users.Country \
1✔
201
    FROM users \
1✔
202
    WHERE Spending >= 1",
1✔
203
    )
1✔
204
    .unwrap();
1✔
205

1✔
206
    let mut asm = AppSourceManager::new();
1✔
207
    asm.add(AppSource::new(
1✔
208
        "mem".to_string(),
1✔
209
        Arc::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
210
        vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
211
            .into_iter()
1✔
212
            .collect(),
1✔
213
    ))
1✔
214
    .unwrap();
1✔
215

1✔
216
    pipeline.add_sink(
1✔
217
        Arc::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
218
        "sink",
1✔
219
    );
1✔
220
    pipeline
1✔
221
        .connect_nodes(&node, Some(node_port), "sink", Some(DEFAULT_PORT_HANDLE))
1✔
222
        .unwrap();
1✔
223

1✔
224
    let mut app = App::new(asm);
1✔
225
    app.add_pipeline(pipeline);
1✔
226

1✔
227
    let dag = app.get_dag().unwrap();
1✔
228

1✔
229
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
230
    if tmp_dir.path().exists() {
1✔
231
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
1✔
232
    }
1✔
233
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
234

1✔
235
    use std::time::Instant;
1✔
236
    let now = Instant::now();
1✔
237

1✔
238
    let tmp_dir = TempDir::new("test").unwrap();
1✔
239
    let mut executor = DagExecutor::new(
1✔
240
        &dag,
1✔
241
        tmp_dir.path(),
1✔
242
        ExecutorOptions::default(),
1✔
243
        Arc::new(AtomicBool::new(true)),
1✔
244
    )
1✔
245
    .unwrap();
1✔
246

1✔
247
    executor
1✔
248
        .start()
1✔
249
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {}", e));
1✔
250
    assert!(executor.join().is_ok());
1✔
251

252
    let elapsed = now.elapsed();
1✔
253
    debug!("Elapsed: {:.2?}", elapsed);
1✔
254
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc