• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4370280743

pending completion
4370280743

push

github

GitHub
Bump async-trait from 0.1.65 to 0.1.66 (#1179)

27808 of 38702 relevant lines covered (71.85%)

25323.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.63
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSource, AppSourceManager};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::executor::{DagExecutor, ExecutorOptions};
6
use dozer_core::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
8
};
9
use dozer_core::storage::lmdb_storage::SharedTransaction;
10
use dozer_core::DEFAULT_PORT_HANDLE;
11
use dozer_types::ingestion_types::IngestionMessage;
12
use dozer_types::log::debug;
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::types::{
15
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
16
};
17

18
use std::collections::HashMap;
19
use std::fs;
20

21
use std::sync::atomic::AtomicBool;
22
use std::sync::Arc;
23
use tempdir::TempDir;
24

25
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
26

27
/// Test Source
28
#[derive(Debug)]
×
29
pub struct TestSourceFactory {
30
    output_ports: Vec<PortHandle>,
31
}
×
32

33
impl TestSourceFactory {
34
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
35
        Self { output_ports }
1✔
36
    }
1✔
37
}
×
38

×
39
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
×
40
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
41
        self.output_ports
3✔
42
            .iter()
3✔
43
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
44
            .collect()
3✔
45
    }
3✔
46

×
47
    fn get_output_schema(
1✔
48
        &self,
1✔
49
        _port: &PortHandle,
1✔
50
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
1✔
51
        Ok((
1✔
52
            Schema::empty()
1✔
53
                .field(
1✔
54
                    FieldDefinition::new(
1✔
55
                        String::from("CustomerID"),
1✔
56
                        FieldType::Int,
1✔
57
                        false,
1✔
58
                        SourceDefinition::Dynamic,
1✔
59
                    ),
1✔
60
                    false,
1✔
61
                )
1✔
62
                .field(
1✔
63
                    FieldDefinition::new(
1✔
64
                        String::from("Country"),
1✔
65
                        FieldType::String,
1✔
66
                        false,
1✔
67
                        SourceDefinition::Dynamic,
1✔
68
                    ),
1✔
69
                    false,
1✔
70
                )
1✔
71
                .field(
1✔
72
                    FieldDefinition::new(
1✔
73
                        String::from("Spending"),
1✔
74
                        FieldType::Float,
1✔
75
                        false,
1✔
76
                        SourceDefinition::Dynamic,
1✔
77
                    ),
1✔
78
                    false,
1✔
79
                )
1✔
80
                .clone(),
1✔
81
            SchemaSQLContext::default(),
1✔
82
        ))
1✔
83
    }
1✔
84

×
85
    fn build(
1✔
86
        &self,
1✔
87
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
88
    ) -> Result<Box<dyn Source>, ExecutionError> {
1✔
89
        Ok(Box::new(TestSource {}))
1✔
90
    }
1✔
91
}
×
92

×
93
#[derive(Debug)]
×
94
pub struct TestSource {}
95

96
impl Source for TestSource {
×
97
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError> {
×
98
        Ok(false)
×
99
    }
×
100

×
101
    fn start(
1✔
102
        &self,
1✔
103
        fw: &mut dyn SourceChannelForwarder,
1✔
104
        _last_checkpoint: Option<(u64, u64)>,
1✔
105
    ) -> Result<(), ExecutionError> {
1✔
106
        for n in 0..10000 {
10,001✔
107
            fw.send(
10,000✔
108
                IngestionMessage::new_op(
10,000✔
109
                    n,
10,000✔
110
                    0,
10,000✔
111
                    Operation::Insert {
10,000✔
112
                        new: Record::new(
10,000✔
113
                            None,
10,000✔
114
                            vec![
10,000✔
115
                                Field::Int(0),
10,000✔
116
                                Field::String("Italy".to_string()),
10,000✔
117
                                Field::Float(OrderedFloat(5.5)),
10,000✔
118
                            ],
10,000✔
119
                            None,
10,000✔
120
                        ),
10,000✔
121
                    },
10,000✔
122
                ),
10,000✔
123
                DEFAULT_PORT_HANDLE,
10,000✔
124
            )
10,000✔
125
            .unwrap();
10,000✔
126
        }
10,000✔
127
        Ok(())
1✔
128
    }
1✔
129
}
×
130

×
131
#[derive(Debug)]
×
132
pub struct TestSinkFactory {
133
    input_ports: Vec<PortHandle>,
134
}
×
135

136
impl TestSinkFactory {
137
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
138
        Self { input_ports }
1✔
139
    }
1✔
140
}
×
141

×
142
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
143
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
144
        self.input_ports.clone()
4✔
145
    }
4✔
146

×
147
    fn build(
1✔
148
        &self,
1✔
149
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
150
    ) -> Result<Box<dyn Sink>, ExecutionError> {
1✔
151
        Ok(Box::new(TestSink {}))
1✔
152
    }
1✔
153

×
154
    fn prepare(
1✔
155
        &self,
1✔
156
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
157
    ) -> Result<(), ExecutionError> {
1✔
158
        Ok(())
1✔
159
    }
1✔
160
}
×
161

×
162
#[derive(Debug)]
×
163
pub struct TestSink {}
×
164

165
impl Sink for TestSink {
166
    fn process(
10,000✔
167
        &mut self,
10,000✔
168
        _from_port: PortHandle,
10,000✔
169
        _op: Operation,
10,000✔
170
        _state: &SharedTransaction,
10,000✔
171
    ) -> Result<(), ExecutionError> {
10,000✔
172
        Ok(())
10,000✔
173
    }
10,000✔
174

×
175
    fn commit(&mut self, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
1✔
176
        Ok(())
1✔
177
    }
1✔
178

179
    fn on_source_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
180
        Ok(())
×
181
    }
×
182
}
183

×
184
#[test]
1✔
185
fn test_pipeline_builder() {
1✔
186
    let mut pipeline = AppPipeline::new();
1✔
187
    let context = statement_to_pipeline(
1✔
188
        "SELECT COUNT(Spending), users.Country \
1✔
189
        FROM users \
1✔
190
         WHERE Spending >= 1",
1✔
191
        &mut pipeline,
1✔
192
        Some("results".to_string()),
1✔
193
    )
1✔
194
    .unwrap();
1✔
195

1✔
196
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
197

1✔
198
    let mut asm = AppSourceManager::new();
1✔
199
    asm.add(AppSource::new(
1✔
200
        "mem".to_string(),
1✔
201
        Arc::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
202
        vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
203
            .into_iter()
1✔
204
            .collect(),
1✔
205
    ))
1✔
206
    .unwrap();
1✔
207

1✔
208
    pipeline.add_sink(
1✔
209
        Arc::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
210
        "sink",
1✔
211
    );
1✔
212
    pipeline
1✔
213
        .connect_nodes(
1✔
214
            &table_info.node,
1✔
215
            Some(table_info.port),
1✔
216
            "sink",
1✔
217
            Some(DEFAULT_PORT_HANDLE),
1✔
218
            true,
1✔
219
        )
1✔
220
        .unwrap();
1✔
221

1✔
222
    let mut app = App::new(asm);
1✔
223
    app.add_pipeline(pipeline);
1✔
224

1✔
225
    let dag = app.get_dag().unwrap();
1✔
226

1✔
227
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
228
    if tmp_dir.path().exists() {
1✔
229
        fs::remove_dir_all(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to remove old dir"));
1✔
230
    }
1✔
231
    fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
1✔
232

1✔
233
    use std::time::Instant;
1✔
234
    let now = Instant::now();
1✔
235

1✔
236
    let tmp_dir = TempDir::new("test").unwrap();
1✔
237
    DagExecutor::new(
1✔
238
        dag,
1✔
239
        tmp_dir.path().to_path_buf(),
1✔
240
        ExecutorOptions::default(),
1✔
241
    )
1✔
242
    .unwrap()
1✔
243
    .start(Arc::new(AtomicBool::new(true)))
1✔
244
    .unwrap()
1✔
245
    .join()
1✔
246
    .unwrap();
1✔
247

1✔
248
    let elapsed = now.elapsed();
1✔
249
    debug!("Elapsed: {:.2?}", elapsed);
1✔
250
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc