• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5725329819

pending completion
5725329819

push

github

web-flow
chore: Add `SourceFactory::get_output_port_name` to simplify ui graph generation (#1812)

132 of 132 new or added lines in 13 files covered. (100.0%)

45518 of 60084 relevant lines covered (75.76%)

39014.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.66
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::executor::{DagExecutor, ExecutorOptions};
5
use dozer_core::executor_operation::ProcessorOperation;
6
use dozer_core::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
8
};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::epoch::Epoch;
11
use dozer_types::errors::internal::BoxedError;
12
use dozer_types::ingestion_types::IngestionMessage;
13
use dozer_types::log::debug;
14
use dozer_types::ordered_float::OrderedFloat;
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use std::collections::HashMap;
20

21
use std::sync::atomic::AtomicBool;
22
use std::sync::Arc;
23

24
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
25

26
/// Test Source
27
#[derive(Debug)]
×
28
pub struct TestSourceFactory {
29
    output_ports: Vec<PortHandle>,
30
}
31

32
impl TestSourceFactory {
33
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
34
        Self { output_ports }
1✔
35
    }
1✔
36
}
37

38
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
39
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
40
        self.output_ports
3✔
41
            .iter()
3✔
42
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
43
            .collect()
3✔
44
    }
3✔
45

46
    fn get_output_schema(
1✔
47
        &self,
1✔
48
        _port: &PortHandle,
1✔
49
    ) -> Result<(Schema, SchemaSQLContext), BoxedError> {
1✔
50
        Ok((
1✔
51
            Schema::default()
1✔
52
                .field(
1✔
53
                    FieldDefinition::new(
1✔
54
                        String::from("CustomerID"),
1✔
55
                        FieldType::Int,
1✔
56
                        false,
1✔
57
                        SourceDefinition::Dynamic,
1✔
58
                    ),
1✔
59
                    false,
1✔
60
                )
1✔
61
                .field(
1✔
62
                    FieldDefinition::new(
1✔
63
                        String::from("Country"),
1✔
64
                        FieldType::String,
1✔
65
                        false,
1✔
66
                        SourceDefinition::Dynamic,
1✔
67
                    ),
1✔
68
                    false,
1✔
69
                )
1✔
70
                .field(
1✔
71
                    FieldDefinition::new(
1✔
72
                        String::from("Spending"),
1✔
73
                        FieldType::Float,
1✔
74
                        false,
1✔
75
                        SourceDefinition::Dynamic,
1✔
76
                    ),
1✔
77
                    false,
1✔
78
                )
1✔
79
                .clone(),
1✔
80
            SchemaSQLContext::default(),
1✔
81
        ))
1✔
82
    }
1✔
83

84
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
85
        format!("port_{}", port)
×
86
    }
×
87

×
88
    fn build(
1✔
89
        &self,
1✔
90
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
91
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
92
        Ok(Box::new(TestSource {}))
1✔
93
    }
1✔
94
}
95

96
#[derive(Debug)]
×
97
pub struct TestSource {}
×
98

×
99
impl Source for TestSource {
100
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
101
        Ok(false)
×
102
    }
×
103

×
104
    fn start(
1✔
105
        &self,
1✔
106
        fw: &mut dyn SourceChannelForwarder,
1✔
107
        _last_checkpoint: Option<(u64, u64)>,
1✔
108
    ) -> Result<(), BoxedError> {
1✔
109
        for n in 0..10000 {
10,001✔
110
            fw.send(
10,000✔
111
                IngestionMessage::new_op(
10,000✔
112
                    n,
10,000✔
113
                    0,
10,000✔
114
                    0,
10,000✔
115
                    Operation::Insert {
10,000✔
116
                        new: Record::new(vec![
10,000✔
117
                            Field::Int(0),
10,000✔
118
                            Field::String("Italy".to_string()),
10,000✔
119
                            Field::Float(OrderedFloat(5.5)),
10,000✔
120
                        ]),
10,000✔
121
                    },
10,000✔
122
                ),
10,000✔
123
                DEFAULT_PORT_HANDLE,
10,000✔
124
            )
10,000✔
125
            .unwrap();
10,000✔
126
        }
10,000✔
127
        Ok(())
1✔
128
    }
1✔
129
}
130

131
#[derive(Debug)]
×
132
pub struct TestSinkFactory {
133
    input_ports: Vec<PortHandle>,
×
134
}
×
135

×
136
impl TestSinkFactory {
137
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
138
        Self { input_ports }
1✔
139
    }
1✔
140
}
×
141

×
142
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
143
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
144
        self.input_ports.clone()
4✔
145
    }
4✔
146

×
147
    fn build(
1✔
148
        &self,
1✔
149
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
150
    ) -> Result<Box<dyn Sink>, BoxedError> {
1✔
151
        Ok(Box::new(TestSink {}))
1✔
152
    }
1✔
153

×
154
    fn prepare(
1✔
155
        &self,
1✔
156
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
157
    ) -> Result<(), BoxedError> {
1✔
158
        Ok(())
1✔
159
    }
1✔
160
}
161

162
#[derive(Debug)]
×
163
pub struct TestSink {}
×
164

×
165
impl Sink for TestSink {
×
166
    fn process(
10,000✔
167
        &mut self,
10,000✔
168
        _from_port: PortHandle,
10,000✔
169
        _op: ProcessorOperation,
10,000✔
170
    ) -> Result<(), BoxedError> {
10,000✔
171
        Ok(())
10,000✔
172
    }
10,000✔
173

174
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
2✔
175
        Ok(())
2✔
176
    }
2✔
177

178
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
179
        Ok(())
×
180
    }
×
181
}
×
182

×
183
#[test]
1✔
184
fn test_pipeline_builder() {
1✔
185
    let mut pipeline = AppPipeline::new();
1✔
186
    let context = statement_to_pipeline(
1✔
187
        "SELECT COUNT(Spending), users.Country \
1✔
188
        FROM users \
1✔
189
         WHERE Spending >= 1",
1✔
190
        &mut pipeline,
1✔
191
        Some("results".to_string()),
1✔
192
    )
1✔
193
    .unwrap();
1✔
194

1✔
195
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
196

1✔
197
    let mut asm = AppSourceManager::new();
1✔
198
    asm.add(
1✔
199
        Box::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
200
        AppSourceMappings::new(
1✔
201
            "mem".to_string(),
1✔
202
            vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
203
                .into_iter()
1✔
204
                .collect(),
1✔
205
        ),
1✔
206
    )
1✔
207
    .unwrap();
1✔
208

1✔
209
    pipeline.add_sink(
1✔
210
        Box::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
211
        "sink",
1✔
212
        None,
1✔
213
    );
1✔
214
    pipeline.connect_nodes(
1✔
215
        &table_info.node,
1✔
216
        table_info.port,
1✔
217
        "sink",
1✔
218
        DEFAULT_PORT_HANDLE,
1✔
219
    );
1✔
220

1✔
221
    let mut app = App::new(asm);
1✔
222
    app.add_pipeline(pipeline);
1✔
223

1✔
224
    let dag = app.into_dag().unwrap();
1✔
225

1✔
226
    let now = std::time::Instant::now();
1✔
227

1✔
228
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
229
        .unwrap()
1✔
230
        .start(Arc::new(AtomicBool::new(true)))
1✔
231
        .unwrap()
1✔
232
        .join()
1✔
233
        .unwrap();
1✔
234

1✔
235
    let elapsed = now.elapsed();
1✔
236
    debug!("Elapsed: {:.2?}", elapsed);
1✔
237
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc