• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5927446903

21 Aug 2023 02:14PM UTC coverage: 76.127%. First build
5927446903

Pull #1854

github

chubei
chore: Only retry `Error::Storage` in `Queue`. Return `SendError` instead of `String`
Pull Request #1854: feat: Write record store checkpoints

811 of 811 new or added lines in 34 files covered. (100.0%)

46477 of 61052 relevant lines covered (76.13%)

40747.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.91
/dozer-sql/src/pipeline/tests/builder_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::checkpoint::create_checkpoint_factory_for_test;
5
use dozer_core::dozer_log::storage::Queue;
6
use dozer_core::epoch::Epoch;
7
use dozer_core::executor::{DagExecutor, ExecutorOptions};
8
use dozer_core::executor_operation::ProcessorOperation;
9
use dozer_core::node::{
10
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
11
};
12
use dozer_core::processor_record::ProcessorRecordStore;
13
use dozer_core::DEFAULT_PORT_HANDLE;
14
use dozer_types::errors::internal::BoxedError;
15
use dozer_types::ingestion_types::IngestionMessage;
16
use dozer_types::log::debug;
17
use dozer_types::ordered_float::OrderedFloat;
18
use dozer_types::types::{
19
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
20
};
21

22
use std::collections::HashMap;
23

24
use std::sync::atomic::AtomicBool;
25
use std::sync::Arc;
26

27
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
28

29
/// Test Source
30
#[derive(Debug)]
×
31
pub struct TestSourceFactory {
32
    output_ports: Vec<PortHandle>,
33
}
34

35
impl TestSourceFactory {
36
    pub fn new(output_ports: Vec<PortHandle>) -> Self {
1✔
37
        Self { output_ports }
1✔
38
    }
1✔
39
}
40

41
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
42
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
3✔
43
        self.output_ports
3✔
44
            .iter()
3✔
45
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
3✔
46
            .collect()
3✔
47
    }
3✔
48

49
    fn get_output_schema(
1✔
50
        &self,
1✔
51
        _port: &PortHandle,
1✔
52
    ) -> Result<(Schema, SchemaSQLContext), BoxedError> {
1✔
53
        Ok((
1✔
54
            Schema::default()
1✔
55
                .field(
1✔
56
                    FieldDefinition::new(
1✔
57
                        String::from("CustomerID"),
1✔
58
                        FieldType::Int,
1✔
59
                        false,
1✔
60
                        SourceDefinition::Dynamic,
1✔
61
                    ),
1✔
62
                    false,
1✔
63
                )
1✔
64
                .field(
1✔
65
                    FieldDefinition::new(
1✔
66
                        String::from("Country"),
1✔
67
                        FieldType::String,
1✔
68
                        false,
1✔
69
                        SourceDefinition::Dynamic,
1✔
70
                    ),
1✔
71
                    false,
1✔
72
                )
1✔
73
                .field(
1✔
74
                    FieldDefinition::new(
1✔
75
                        String::from("Spending"),
1✔
76
                        FieldType::Float,
1✔
77
                        false,
1✔
78
                        SourceDefinition::Dynamic,
1✔
79
                    ),
1✔
80
                    false,
1✔
81
                )
1✔
82
                .clone(),
1✔
83
            SchemaSQLContext::default(),
1✔
84
        ))
1✔
85
    }
1✔
86

87
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
88
        format!("port_{}", port)
×
89
    }
×
90

91
    fn build(
1✔
92
        &self,
1✔
93
        _output_schemas: HashMap<PortHandle, Schema>,
1✔
94
    ) -> Result<Box<dyn Source>, BoxedError> {
1✔
95
        Ok(Box::new(TestSource {}))
1✔
96
    }
1✔
97
}
98

99
#[derive(Debug)]
×
100
pub struct TestSource {}
101

102
impl Source for TestSource {
103
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
104
        Ok(false)
×
105
    }
×
106

107
    fn start(
1✔
108
        &self,
1✔
109
        fw: &mut dyn SourceChannelForwarder,
1✔
110
        _last_checkpoint: Option<(u64, u64)>,
1✔
111
    ) -> Result<(), BoxedError> {
1✔
112
        for n in 0..10000 {
10,001✔
113
            fw.send(
10,000✔
114
                IngestionMessage::new_op(
10,000✔
115
                    n,
10,000✔
116
                    0,
10,000✔
117
                    0,
10,000✔
118
                    Operation::Insert {
10,000✔
119
                        new: Record::new(vec![
10,000✔
120
                            Field::Int(0),
10,000✔
121
                            Field::String("Italy".to_string()),
10,000✔
122
                            Field::Float(OrderedFloat(5.5)),
10,000✔
123
                        ]),
10,000✔
124
                    },
10,000✔
125
                ),
10,000✔
126
                DEFAULT_PORT_HANDLE,
10,000✔
127
            )
10,000✔
128
            .unwrap();
10,000✔
129
        }
10,000✔
130
        Ok(())
1✔
131
    }
1✔
132
}
133

134
#[derive(Debug)]
×
135
pub struct TestSinkFactory {
136
    input_ports: Vec<PortHandle>,
137
}
138

139
impl TestSinkFactory {
140
    pub fn new(input_ports: Vec<PortHandle>) -> Self {
1✔
141
        Self { input_ports }
1✔
142
    }
1✔
143
}
144

145
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
146
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
147
        self.input_ports.clone()
4✔
148
    }
4✔
149

150
    fn build(
1✔
151
        &self,
1✔
152
        _input_schemas: HashMap<PortHandle, Schema>,
1✔
153
    ) -> Result<Box<dyn Sink>, BoxedError> {
1✔
154
        Ok(Box::new(TestSink {}))
1✔
155
    }
1✔
156

157
    fn prepare(
1✔
158
        &self,
1✔
159
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
1✔
160
    ) -> Result<(), BoxedError> {
1✔
161
        Ok(())
1✔
162
    }
1✔
163
}
164

165
#[derive(Debug)]
×
166
pub struct TestSink {}
167

168
impl Sink for TestSink {
169
    fn process(
10,000✔
170
        &mut self,
10,000✔
171
        _from_port: PortHandle,
10,000✔
172
        _record_store: &ProcessorRecordStore,
10,000✔
173
        _op: ProcessorOperation,
10,000✔
174
    ) -> Result<(), BoxedError> {
10,000✔
175
        Ok(())
10,000✔
176
    }
10,000✔
177

178
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
3✔
179
        Ok(())
3✔
180
    }
3✔
181

182
    fn persist(&mut self, _queue: &Queue) -> Result<(), BoxedError> {
×
183
        Ok(())
×
184
    }
×
185

186
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
187
        Ok(())
×
188
    }
×
189
}
190

191
#[tokio::test]
1✔
192
async fn test_pipeline_builder() {
1✔
193
    let mut pipeline = AppPipeline::new();
1✔
194
    let context = statement_to_pipeline(
1✔
195
        "SELECT COUNT(Spending), users.Country \
1✔
196
        FROM users \
1✔
197
         WHERE Spending >= 1",
1✔
198
        &mut pipeline,
1✔
199
        Some("results".to_string()),
1✔
200
    )
1✔
201
    .unwrap();
1✔
202

1✔
203
    let table_info = context.output_tables_map.get("results").unwrap();
1✔
204

1✔
205
    let mut asm = AppSourceManager::new();
1✔
206
    asm.add(
1✔
207
        Box::new(TestSourceFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
208
        AppSourceMappings::new(
1✔
209
            "mem".to_string(),
1✔
210
            vec![("users".to_string(), DEFAULT_PORT_HANDLE)]
1✔
211
                .into_iter()
1✔
212
                .collect(),
1✔
213
        ),
1✔
214
    )
1✔
215
    .unwrap();
1✔
216

1✔
217
    pipeline.add_sink(
1✔
218
        Box::new(TestSinkFactory::new(vec![DEFAULT_PORT_HANDLE])),
1✔
219
        "sink",
1✔
220
        None,
1✔
221
    );
1✔
222
    pipeline.connect_nodes(
1✔
223
        &table_info.node,
1✔
224
        table_info.port,
1✔
225
        "sink",
1✔
226
        DEFAULT_PORT_HANDLE,
1✔
227
    );
1✔
228

1✔
229
    let mut app = App::new(asm);
1✔
230
    app.add_pipeline(pipeline);
1✔
231

1✔
232
    let dag = app.into_dag().unwrap();
1✔
233

1✔
234
    let now = std::time::Instant::now();
1✔
235

236
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
237
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
238
        .unwrap()
1✔
239
        .start(Arc::new(AtomicBool::new(true)))
1✔
240
        .unwrap()
1✔
241
        .join()
1✔
242
        .unwrap();
1✔
243

1✔
244
    let elapsed = now.elapsed();
1✔
245
    debug!("Elapsed: {:.2?}", elapsed);
1✔
246
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc