• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6008856021

29 Aug 2023 06:44AM UTC coverage: 76.756% (-1.0%) from 77.736%
6008856021

push

github

web-flow
chore: Remove unused generic type parameter in `dozer-core` (#1929)

330 of 330 new or added lines in 38 files covered. (100.0%)

48977 of 63809 relevant lines covered (76.76%)

48470.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.26
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::checkpoint::create_checkpoint_factory_for_test;
5
use dozer_core::dozer_log::storage::Queue;
6
use dozer_core::epoch::Epoch;
7
use dozer_core::executor::{DagExecutor, ExecutorOptions};
8
use dozer_core::executor_operation::ProcessorOperation;
9
use dozer_core::node::{
10
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
11
};
12
use dozer_core::processor_record::ProcessorRecordStore;
13
use dozer_core::DEFAULT_PORT_HANDLE;
14
use dozer_types::errors::internal::BoxedError;
15
use dozer_types::ingestion_types::IngestionMessage;
16
use dozer_types::log::debug;
17
use dozer_types::ordered_float::OrderedFloat;
18
use dozer_types::types::{
19
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
20
};
21

22
use std::collections::HashMap;
23

24
use std::sync::atomic::AtomicBool;
25
use std::sync::Arc;
26

27
use crate::pipeline::builder::statement_to_pipeline;
28

29
/// Test Source
30
#[derive(Debug)]
×
31
pub struct TestSourceFactory {
32
    output_ports: Vec<PortHandle>,
33
}
34

35
impl TestSourceFactory {
36
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
37
        Self { output_ports }
1✔
38
    }
1✔
39
}
40

41
impl SourceFactory for TestSourceFactory {
42
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
43
        self.output_ports
3✔
44
            .iter()
3✔
45
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
46
            .collect()
3✔
47
    }
3✔
48

49
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, BoxedError> {
1✔
50
        Ok(Schema::default()
1✔
51
            .field(
1✔
52
                FieldDefinition::new(
1✔
53
                    String::from("CustomerID"),
1✔
54
                    FieldType::Int,
1✔
55
                    false,
1✔
56
                    SourceDefinition::Dynamic,
1✔
57
                ),
1✔
58
                false,
1✔
59
            )
1✔
60
            .field(
1✔
61
                FieldDefinition::new(
1✔
62
                    String::from("Country"),
1✔
63
                    FieldType::String,
1✔
64
                    false,
1✔
65
                    SourceDefinition::Dynamic,
1✔
66
                ),
1✔
67
                false,
1✔
68
            )
1✔
69
            .field(
1✔
70
                FieldDefinition::new(
1✔
71
                    String::from("Spending"),
1✔
72
                    FieldType::Float,
1✔
73
                    false,
1✔
74
                    SourceDefinition::Dynamic,
1✔
75
                ),
1✔
76
                false,
1✔
77
            )
1✔
78
            .clone())
1✔
79
    }
1✔
80

×
81
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
82
        format!("port_{}", port)
×
83
    }
×
84

×
85
    fn build(
1✔
86
        &self,
1✔
87
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
88
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
89
        Ok(Box::new(TestSource {}))
1✔
90
    }
1✔
91
}
×
92

×
93
#[derive(Debug)]
×
94
pub struct TestSource {}
×
95

×
96
impl Source for TestSource {
×
97
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
98
        Ok(false)
×
99
    }
×
100

101
    fn start(
1✔
102
        &self,
1✔
103
        fw: &mut dyn SourceChannelForwarder,
1✔
104
        _last_checkpoint: Option<(u64, u64)>,
1✔
105
    ) -> Result<(), BoxedError> {
1✔
106
        for n in 0..10000 {
10,001✔
107
            fw.send(
10,000✔
108
                IngestionMessage::new_op(
10,000✔
109
                    n,
10,000✔
110
                    0,
10,000✔
111
                    0,
10,000✔
112
                    Operation::Insert {
10,000✔
113
                        new: Record::new(vec![
10,000✔
114
                            Field::Int(0),
10,000✔
115
                            Field::String("Italy".to_string()),
10,000✔
116
                            Field::Float(OrderedFloat(5.5)),
10,000✔
117
                        ]),
10,000✔
118
                    },
10,000✔
119
                ),
10,000✔
120
                DEFAULT_PORT_HANDLE,
10,000✔
121
            )
10,000✔
122
            .unwrap();
10,000✔
123
        }
10,000✔
124
        Ok(())
1✔
125
    }
1✔
126
}
×
127

×
128
#[derive(Debug)]
×
129
pub struct TestSinkFactory {
×
130
    input_ports: Vec<PortHandle>,
×
131
}
×
132

133
impl TestSinkFactory {
134
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
135
        Self { input_ports }
1✔
136
    }
1✔
137
}
138

139
impl SinkFactory for TestSinkFactory {
140
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
141
        self.input_ports.clone()
4✔
142
    }
4✔
143

144
    fn build(
1✔
145
        &self,
1✔
146
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
147
    ) -> Result<Box<dyn Sink>, BoxedError> {
1✔
148
        Ok(Box::new(TestSink {}))
1✔
149
    }
1✔
150

×
151
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError> {
1✔
152
        Ok(())
1✔
153
    }
1✔
154
}
×
155

×
156
#[derive(Debug)]
×
157
pub struct TestSink {}
×
158

×
159
impl Sink for TestSink {
×
160
    fn process(
10,000✔
161
        &mut self,
10,000✔
162
        _from_port: PortHandle,
10,000✔
163
        _record_store: &ProcessorRecordStore,
10,000✔
164
        _op: ProcessorOperation,
10,000✔
165
    ) -> Result<(), BoxedError> {
10,000✔
166
        Ok(())
10,000✔
167
    }
10,000✔
168

169
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
1✔
170
        Ok(())
1✔
171
    }
1✔
172

×
173
    fn persist(&mut self, _queue: &Queue) -> Result<(), BoxedError> {
×
174
        Ok(())
×
175
    }
×
176

×
177
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
178
        Ok(())
×
179
    }
×
180
}
×
181

182
#[tokio::test]
1✔
183
async fn test_pipeline_builder() {
1✔
184
    let mut pipeline = AppPipeline::new_with_default_flags();
1✔
185
    let context = statement_to_pipeline(
1✔
186
        "SELECT COUNT(Spending), users.Country \
1✔
187
        FROM users \
1✔
188
         WHERE Spending >= 1",
1✔
189
        &mut pipeline,
1✔
190
        Some("results".to_string()),
1✔
191
    )
1✔
192
    .unwrap();
1✔
193

1✔
194
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
195

1✔
196
    let mut asm = AppSourceManager::new();
1✔
197
    asm.add(
1✔
198
        Box::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
199
        AppSourceMappings::new(
1✔
200
            "mem".to_string(),
1✔
201
            vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
202
                .into_iter()
1✔
203
                .collect(),
1✔
204
        ),
1✔
205
    )
1✔
206
    .unwrap();
1✔
207

1✔
208
    pipeline.add_sink(
1✔
209
        Box::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
210
        "sink",
1✔
211
        None,
1✔
212
    );
1✔
213
    pipeline.connect_nodes(
1✔
214
        &table_info.node,
1✔
215
        table_info.port,
1✔
216
        "sink",
1✔
217
        DEFAULT_PORT_HANDLE,
1✔
218
    );
1✔
219

1✔
220
    let mut app = App::new(asm);
1✔
221
    app.add_pipeline(pipeline);
1✔
222

1✔
223
    let dag = app.into_dag().unwrap();
1✔
224

1✔
225
    let now = std::time::Instant::now();
1✔
226

×
227
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
228
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
229
        .unwrap()
1✔
230
        .start(Arc::new(AtomicBool::new(true)))
1✔
231
        .unwrap()
1✔
232
        .join()
1✔
233
        .unwrap();
1✔
234

1✔
235
    let elapsed = now.elapsed();
1✔
236
    debug!("Elapsed: {:.2?}", elapsed);
1✔
237
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc