• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4360628700

pending completion
4360628700

push

github

GitHub
refactor: pipeline memory storage (#1135)

1152 of 1152 new or added lines in 40 files covered. (100.0%)

27472 of 39301 relevant lines covered (69.9%)

28369.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.33
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSource, AppSourceManager};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::executor::{DagExecutor, ExecutorOptions};
6
use dozer_core::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
8
};
9
use dozer_core::storage::lmdb_storage::SharedTransaction;
10
use dozer_core::DEFAULT_PORT_HANDLE;
11
use dozer_types::ingestion_types::IngestionMessage;
12
use dozer_types::log::debug;
13
use dozer_types::node::SourceStates;
14
use dozer_types::ordered_float::OrderedFloat;
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use dozer_core::epoch::Epoch;
20

21
use std::collections::HashMap;
22
use std::fs;
23

24
use std::sync::atomic::AtomicBool;
25
use std::sync::Arc;
26
use tempdir::TempDir;
27

28
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
29

30
/// Test Source
31
#[derive(Debug)]
×
32
pub struct TestSourceFactory {
×
33
    output_ports: Vec<PortHandle>,
34
}
35

36
impl TestSourceFactory {
37
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
38
        Self { output_ports }
1✔
39
    }
1✔
40
}
×
41

42
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
43
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
44
        self.output_ports
3✔
45
            .iter()
3✔
46
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
47
            .collect()
3✔
48
    }
3✔
49

×
50
    fn get_output_schema(
1✔
51
        &self,
1✔
52
        _port: &PortHandle,
1✔
53
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
1✔
54
        Ok((
1✔
55
            Schema::empty()
1✔
56
                .field(
1✔
57
                    FieldDefinition::new(
1✔
58
                        String::from("CustomerID"),
1✔
59
                        FieldType::Int,
1✔
60
                        false,
1✔
61
                        SourceDefinition::Dynamic,
1✔
62
                    ),
1✔
63
                    false,
1✔
64
                )
1✔
65
                .field(
1✔
66
                    FieldDefinition::new(
1✔
67
                        String::from("Country"),
1✔
68
                        FieldType::String,
1✔
69
                        false,
1✔
70
                        SourceDefinition::Dynamic,
1✔
71
                    ),
1✔
72
                    false,
1✔
73
                )
1✔
74
                .field(
1✔
75
                    FieldDefinition::new(
1✔
76
                        String::from("Spending"),
1✔
77
                        FieldType::Float,
1✔
78
                        false,
1✔
79
                        SourceDefinition::Dynamic,
1✔
80
                    ),
1✔
81
                    false,
1✔
82
                )
1✔
83
                .clone(),
1✔
84
            SchemaSQLContext::default(),
1✔
85
        ))
1✔
86
    }
1✔
87

×
88
    fn build(
1✔
89
        &self,
1✔
90
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
91
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
92
        Ok(Box::new(TestSource {}))
1✔
93
    }
1✔
94
}
×
95

×
96
#[derive(Debug)]
×
97
pub struct TestSource {}
×
98

×
99
impl Source for TestSource {
100
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError> {
×
101
        Ok(false)
×
102
    }
×
103

×
104
    fn start(
1✔
105
        &self,
1✔
106
        fw: &mut dyn SourceChannelForwarder,
1✔
107
        _last_checkpoint: Option<(u64, u64)>,
1✔
108
    ) -> Result<(), ExecutionError> {
1✔
109
        for n in 0..10000 {
10,001✔
110
            fw.send(
10,000✔
111
                IngestionMessage::new_op(
10,000✔
112
                    n,
10,000✔
113
                    0,
10,000✔
114
                    Operation::Insert {
10,000✔
115
                        new: Record::new(
10,000✔
116
                            None,
10,000✔
117
                            vec![
10,000✔
118
                                Field::Int(0),
10,000✔
119
                                Field::String("Italy".to_string()),
10,000✔
120
                                Field::Float(OrderedFloat(5.5)),
10,000✔
121
                            ],
10,000✔
122
                            None,
10,000✔
123
                        ),
10,000✔
124
                    },
10,000✔
125
                ),
10,000✔
126
                DEFAULT_PORT_HANDLE,
10,000✔
127
            )
10,000✔
128
            .unwrap();
10,000✔
129
        }
10,000✔
130
        Ok(())
1✔
131
    }
1✔
132
}
×
133

×
134
#[derive(Debug)]
×
135
pub struct TestSinkFactory {
×
136
    input_ports: Vec<PortHandle>,
×
137
}
138

139
impl TestSinkFactory {
140
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
141
        Self { input_ports }
1✔
142
    }
1✔
143
}
×
144

×
145
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
146
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
147
        self.input_ports.clone()
4✔
148
    }
4✔
149

×
150
    fn build(
1✔
151
        &self,
1✔
152
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
153
        _source_states: &SourceStates,
1✔
154
    ) -> Result<Box<dyn Sink>, ExecutionError> {
1✔
155
        Ok(Box::new(TestSink {}))
1✔
156
    }
1✔
157

×
158
    fn prepare(
1✔
159
        &self,
1✔
160
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
161
    ) -> Result<(), ExecutionError> {
1✔
162
        Ok(())
1✔
163
    }
1✔
164
}
×
165

×
166
#[derive(Debug)]
×
167
pub struct TestSink {}
×
168

×
169
impl Sink for TestSink {
170
    fn process(
10,000✔
171
        &mut self,
10,000✔
172
        _from_port: PortHandle,
10,000✔
173
        _op: Operation,
10,000✔
174
        _state: &SharedTransaction,
10,000✔
175
    ) -> Result<(), ExecutionError> {
10,000✔
176
        Ok(())
10,000✔
177
    }
10,000✔
178

×
179
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
1✔
180
        Ok(())
1✔
181
    }
1✔
182

×
183
    fn on_source_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
184
        Ok(())
×
185
    }
×
186
}
×
187

×
188
#[test]
1✔
189
fn test_pipeline_builder() {
1✔
190
    let mut pipeline = AppPipeline::new();
1✔
191
    let context = statement_to_pipeline(
1✔
192
        "SELECT COUNT(Spending), users.Country \
1✔
193
        FROM users \
1✔
194
         WHERE Spending >= 1",
1✔
195
        &mut pipeline,
1✔
196
        Some("results".to_string()),
1✔
197
    )
1✔
198
    .unwrap();
1✔
199

1✔
200
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
201

1✔
202
    let mut asm = AppSourceManager::new();
1✔
203
    asm.add(AppSource::new(
1✔
204
        "mem".to_string(),
1✔
205
        Arc::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
206
        vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
207
            .into_iter()
1✔
208
            .collect(),
1✔
209
    ))
1✔
210
    .unwrap();
1✔
211

1✔
212
    pipeline.add_sink(
1✔
213
        Arc::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
214
        "sink",
1✔
215
    );
1✔
216
    pipeline
1✔
217
        .connect_nodes(
1✔
218
            &table_info.node,
1✔
219
            Some(table_info.port),
1✔
220
            "sink",
1✔
221
            Some(DEFAULT_PORT_HANDLE),
1✔
222
            true,
1✔
223
        )
1✔
224
        .unwrap();
1✔
225

1✔
226
    let mut app = App::new(asm);
1✔
227
    app.add_pipeline(pipeline);
1✔
228

1✔
229
    let dag = app.get_dag().unwrap();
1✔
230

1✔
231
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
232
    if tmp_dir.path().exists() {
1✔
233
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
1✔
234
    }
1✔
235
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
236

1✔
237
    use std::time::Instant;
1✔
238
    let now = Instant::now();
1✔
239

1✔
240
    let tmp_dir = TempDir::new("test").unwrap();
1✔
241
    DagExecutor::new(
1✔
242
        dag,
1✔
243
        tmp_dir.path().to_path_buf(),
1✔
244
        ExecutorOptions::default(),
1✔
245
    )
1✔
246
    .unwrap()
1✔
247
    .start(Arc::new(AtomicBool::new(true)))
1✔
248
    .unwrap()
1✔
249
    .join()
1✔
250
    .unwrap();
1✔
251

1✔
252
    let elapsed = now.elapsed();
1✔
253
    debug!("Elapsed: {:.2?}", elapsed);
1✔
254
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc