• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.92
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::epoch::Epoch;
5
use dozer_core::executor::{DagExecutor, ExecutorOptions};
6
use dozer_core::executor_operation::ProcessorOperation;
7
use dozer_core::node::{
8
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
9
};
10
use dozer_core::processor_record::ProcessorRecordStore;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::errors::internal::BoxedError;
13
use dozer_types::ingestion_types::IngestionMessage;
14
use dozer_types::log::debug;
15
use dozer_types::ordered_float::OrderedFloat;
16
use dozer_types::types::{
17
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
18
};
19

20
use std::collections::HashMap;
21

22
use std::sync::atomic::AtomicBool;
23
use std::sync::Arc;
24

25
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
26

27
/// Test Source
28
#[derive(Debug)]
×
29
pub struct TestSourceFactory {
30
    output_ports: Vec<PortHandle>,
×
31
}
32

33
impl TestSourceFactory {
34
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
35
        Self { output_ports }
1✔
36
    }
1✔
37
}
×
38

×
39
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
40
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
41
        self.output_ports
3✔
42
            .iter()
3✔
43
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
44
            .collect()
3✔
45
    }
3✔
46

×
47
    fn get_output_schema(
1✔
48
        &self,
1✔
49
        _port: &PortHandle,
1✔
50
    ) -> Result<(Schema, SchemaSQLContext), BoxedError> {
1✔
51
        Ok((
1✔
52
            Schema::default()
1✔
53
                .field(
1✔
54
                    FieldDefinition::new(
1✔
55
                        String::from("CustomerID"),
1✔
56
                        FieldType::Int,
1✔
57
                        false,
1✔
58
                        SourceDefinition::Dynamic,
1✔
59
                    ),
1✔
60
                    false,
1✔
61
                )
1✔
62
                .field(
1✔
63
                    FieldDefinition::new(
1✔
64
                        String::from("Country"),
1✔
65
                        FieldType::String,
1✔
66
                        false,
1✔
67
                        SourceDefinition::Dynamic,
1✔
68
                    ),
1✔
69
                    false,
1✔
70
                )
1✔
71
                .field(
1✔
72
                    FieldDefinition::new(
1✔
73
                        String::from("Spending"),
1✔
74
                        FieldType::Float,
1✔
75
                        false,
1✔
76
                        SourceDefinition::Dynamic,
1✔
77
                    ),
1✔
78
                    false,
1✔
79
                )
1✔
80
                .clone(),
1✔
81
            SchemaSQLContext::default(),
1✔
82
        ))
1✔
83
    }
1✔
84

×
85
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
86
        format!("port_{}", port)
×
87
    }
×
88

×
89
    fn build(
1✔
90
        &self,
1✔
91
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
92
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
93
        Ok(Box::new(TestSource {}))
1✔
94
    }
1✔
95
}
×
96

×
97
#[derive(Debug)]
×
98
pub struct TestSource {}
99

×
100
impl Source for TestSource {
101
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
102
        Ok(false)
×
103
    }
×
104

×
105
    fn start(
1✔
106
        &self,
1✔
107
        fw: &mut dyn SourceChannelForwarder,
1✔
108
        _last_checkpoint: Option<(u64, u64)>,
1✔
109
    ) -> Result<(), BoxedError> {
1✔
110
        for n in 0..10000 {
10,001✔
111
            fw.send(
10,000✔
112
                IngestionMessage::new_op(
10,000✔
113
                    n,
10,000✔
114
                    0,
10,000✔
115
                    0,
10,000✔
116
                    Operation::Insert {
10,000✔
117
                        new: Record::new(vec![
10,000✔
118
                            Field::Int(0),
10,000✔
119
                            Field::String("Italy".to_string()),
10,000✔
120
                            Field::Float(OrderedFloat(5.5)),
10,000✔
121
                        ]),
10,000✔
122
                    },
10,000✔
123
                ),
10,000✔
124
                DEFAULT_PORT_HANDLE,
10,000✔
125
            )
10,000✔
126
            .unwrap();
10,000✔
127
        }
10,000✔
128
        Ok(())
1✔
129
    }
1✔
130
}
×
131

×
132
#[derive(Debug)]
×
133
pub struct TestSinkFactory {
134
    input_ports: Vec<PortHandle>,
×
135
}
136

137
impl TestSinkFactory {
138
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
139
        Self { input_ports }
1✔
140
    }
1✔
141
}
×
142

×
143
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
144
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
145
        self.input_ports.clone()
4✔
146
    }
4✔
147

×
148
    fn build(
1✔
149
        &self,
1✔
150
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
151
    ) -> Result<Box<dyn Sink>, BoxedError> {
1✔
152
        Ok(Box::new(TestSink {}))
1✔
153
    }
1✔
154

×
155
    fn prepare(
1✔
156
        &self,
1✔
157
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
158
    ) -> Result<(), BoxedError> {
1✔
159
        Ok(())
1✔
160
    }
1✔
161
}
×
162

×
163
#[derive(Debug)]
×
164
pub struct TestSink {}
165

×
166
impl Sink for TestSink {
167
    fn process(
10,000✔
168
        &mut self,
10,000✔
169
        _from_port: PortHandle,
10,000✔
170
        _record_store: &ProcessorRecordStore,
10,000✔
171
        _op: ProcessorOperation,
10,000✔
172
    ) -> Result<(), BoxedError> {
10,000✔
173
        Ok(())
10,000✔
174
    }
10,000✔
175

×
176
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
2✔
177
        Ok(())
2✔
178
    }
2✔
179

×
180
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
181
        Ok(())
×
182
    }
×
183
}
×
184

×
185
#[test]
1✔
186
fn test_pipeline_builder() {
1✔
187
    let mut pipeline = AppPipeline::new();
1✔
188
    let context = statement_to_pipeline(
1✔
189
        "SELECT COUNT(Spending), users.Country \
1✔
190
        FROM users \
1✔
191
         WHERE Spending >= 1",
1✔
192
        &mut pipeline,
1✔
193
        Some("results".to_string()),
1✔
194
    )
1✔
195
    .unwrap();
1✔
196

1✔
197
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
198

1✔
199
    let mut asm = AppSourceManager::new();
1✔
200
    asm.add(
1✔
201
        Box::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
202
        AppSourceMappings::new(
1✔
203
            "mem".to_string(),
1✔
204
            vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
205
                .into_iter()
1✔
206
                .collect(),
1✔
207
        ),
1✔
208
    )
1✔
209
    .unwrap();
1✔
210

1✔
211
    pipeline.add_sink(
1✔
212
        Box::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
213
        "sink",
1✔
214
        None,
1✔
215
    );
1✔
216
    pipeline.connect_nodes(
1✔
217
        &table_info.node,
1✔
218
        table_info.port,
1✔
219
        "sink",
1✔
220
        DEFAULT_PORT_HANDLE,
1✔
221
    );
1✔
222

1✔
223
    let mut app = App::new(asm);
1✔
224
    app.add_pipeline(pipeline);
1✔
225

1✔
226
    let dag = app.into_dag().unwrap();
1✔
227

1✔
228
    let now = std::time::Instant::now();
1✔
229

1✔
230
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
231
        .unwrap()
1✔
232
        .start(Arc::new(AtomicBool::new(true)))
1✔
233
        .unwrap()
1✔
234
        .join()
1✔
235
        .unwrap();
1✔
236

1✔
237
    let elapsed = now.elapsed();
1✔
238
    debug!("Elapsed: {:.2?}", elapsed);
1✔
239
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc