• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5978430793

25 Aug 2023 04:54PM UTC coverage: 75.575% (-0.7%) from 76.279%
5978430793

push

github

web-flow
Bump ordered-float from 3.4.0 to 3.9.1 (#1919)

Bumps [ordered-float](https://github.com/reem/rust-ordered-float) from 3.4.0 to 3.9.1.
- [Release notes](https://github.com/reem/rust-ordered-float/releases)
- [Commits](https://github.com/reem/rust-ordered-float/compare/v3.4.0...v3.9.1)

---
updated-dependencies:
- dependency-name: ordered-float
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

47272 of 62550 relevant lines covered (75.57%)

49425.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.56
/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs
1
use dozer_core::{
2
    app::{AppPipeline, PipelineEntryPoint},
3
    DEFAULT_PORT_HANDLE,
4
};
5
use sqlparser::ast::TableWithJoins;
6

7
use crate::pipeline::{
8
    builder::{get_from_source, QueryContext, SchemaSQLContext},
9
    errors::PipelineError,
10
    product::{
11
        join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT},
12
        table::factory::get_name_or_alias,
13
    },
14
    table_operator::factory::TableOperatorProcessorFactory,
15
    window::factory::WindowProcessorFactory,
16
};
17

18
use super::from_builder::{
19
    is_an_entry_point, is_table_operator, ConnectionInfo, TableOperatorDescriptor,
20
};
21

22
#[derive(Clone, Debug)]
308✔
23
enum JoinSource {
24
    Table(String),
25
    Operator(ConnectionInfo),
26
    Join(ConnectionInfo),
27
}
28

29
pub(crate) fn insert_join_to_pipeline(
227✔
30
    from: &TableWithJoins,
227✔
31
    pipeline: &mut AppPipeline<SchemaSQLContext>,
227✔
32
    pipeline_idx: usize,
227✔
33
    query_context: &mut QueryContext,
227✔
34
) -> Result<ConnectionInfo, PipelineError> {
227✔
35
    let mut input_nodes = vec![];
227✔
36

227✔
37
    let left_table = &from.relation;
227✔
38
    let mut left_name_or_alias = Some(get_name_or_alias(left_table)?);
227✔
39
    let mut left_join_source =
227✔
40
        insert_join_source_to_pipeline(left_table.clone(), pipeline, pipeline_idx, query_context)?;
227✔
41

42
    for join in &from.joins {
535✔
43
        let right_table = &join.relation;
308✔
44
        let right_name_or_alias = Some(get_name_or_alias(right_table)?);
308✔
45
        let right_join_source = insert_join_source_to_pipeline(
308✔
46
            right_table.clone(),
308✔
47
            pipeline,
308✔
48
            pipeline_idx,
308✔
49
            query_context,
308✔
50
        )?;
308✔
51

52
        let join_processor_name = format!("join_{}", query_context.get_next_processor_id());
308✔
53
        let join_processor_factory = JoinProcessorFactory::new(
308✔
54
            join_processor_name.clone(),
308✔
55
            left_name_or_alias.clone(),
308✔
56
            right_name_or_alias,
308✔
57
            join.join_operator.clone(),
308✔
58
            pipeline.flags().enable_probabilistic_optimizations.in_joins,
308✔
59
        );
308✔
60

308✔
61
        let mut pipeline_entry_points = vec![];
308✔
62
        if let JoinSource::Table(ref source_table) = left_join_source {
308✔
63
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
227✔
64
                let entry_point = PipelineEntryPoint::new(source_table.clone(), LEFT_JOIN_PORT);
220✔
65

220✔
66
                pipeline_entry_points.push(entry_point);
220✔
67
                query_context.used_sources.push(source_table.to_string());
220✔
68
            } else {
220✔
69
                input_nodes.push((
7✔
70
                    source_table.to_string(),
7✔
71
                    join_processor_name.clone(),
7✔
72
                    LEFT_JOIN_PORT,
7✔
73
                ));
7✔
74
            }
7✔
75
        }
81✔
76

×
77
        if let JoinSource::Table(ref source_table) = right_join_source.clone() {
308✔
78
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
308✔
79
                let entry_point = PipelineEntryPoint::new(source_table.clone(), RIGHT_JOIN_PORT);
293✔
80

293✔
81
                pipeline_entry_points.push(entry_point);
293✔
82
                query_context.used_sources.push(source_table.to_string());
293✔
83
            } else {
293✔
84
                input_nodes.push((
15✔
85
                    source_table.to_string(),
15✔
86
                    join_processor_name.clone(),
15✔
87
                    RIGHT_JOIN_PORT,
15✔
88
                ));
15✔
89
            }
15✔
90
        }
×
91

×
92
        pipeline.add_processor(
308✔
93
            Box::new(join_processor_factory),
308✔
94
            &join_processor_name,
308✔
95
            pipeline_entry_points,
308✔
96
        );
308✔
97

308✔
98
        match left_join_source {
308✔
99
            JoinSource::Table(_) => {}
227✔
100
            JoinSource::Operator(ref connection_info) => pipeline.connect_nodes(
×
101
                &connection_info.output_node.0,
×
102
                connection_info.output_node.1,
×
103
                &join_processor_name,
×
104
                LEFT_JOIN_PORT,
×
105
            ),
×
106
            JoinSource::Join(ref connection_info) => pipeline.connect_nodes(
81✔
107
                &connection_info.output_node.0,
81✔
108
                connection_info.output_node.1,
81✔
109
                &join_processor_name,
81✔
110
                LEFT_JOIN_PORT,
81✔
111
            ),
81✔
112
        }
113

×
114
        match right_join_source {
308✔
115
            JoinSource::Table(_) => {}
308✔
116
            JoinSource::Operator(connection_info) => pipeline.connect_nodes(
×
117
                &connection_info.output_node.0,
×
118
                connection_info.output_node.1,
×
119
                &join_processor_name,
×
120
                RIGHT_JOIN_PORT,
×
121
            ),
×
122
            JoinSource::Join(connection_info) => pipeline.connect_nodes(
×
123
                &connection_info.output_node.0,
×
124
                connection_info.output_node.1,
×
125
                &join_processor_name,
×
126
                RIGHT_JOIN_PORT,
×
127
            ),
×
128
        }
129

130
        // TODO: refactor join source name and aliasing logic
×
131
        left_name_or_alias = None;
308✔
132
        left_join_source = JoinSource::Join(ConnectionInfo {
308✔
133
            input_nodes: input_nodes.clone(),
308✔
134
            output_node: (join_processor_name, DEFAULT_PORT_HANDLE),
308✔
135
        });
308✔
136
    }
137

×
138
    match left_join_source {
227✔
139
        JoinSource::Table(_) => Err(PipelineError::InvalidJoin(
×
140
            "No JOIN operator found".to_string(),
×
141
        )),
×
142
        JoinSource::Operator(_) => Err(PipelineError::InvalidJoin(
×
143
            "No JOIN operator found".to_string(),
×
144
        )),
×
145
        JoinSource::Join(connection_info) => Ok(connection_info),
227✔
146
    }
×
147
}
227✔
148

149
// TODO: refactor this
×
150
fn insert_join_source_to_pipeline(
535✔
151
    source: sqlparser::ast::TableFactor,
535✔
152
    pipeline: &mut AppPipeline<SchemaSQLContext>,
535✔
153
    pipeline_idx: usize,
535✔
154
    query_context: &mut QueryContext,
535✔
155
) -> Result<JoinSource, PipelineError> {
535✔
156
    let join_source = if let Some(table_operator) = is_table_operator(&source)? {
535✔
157
        let connection_info = insert_table_operator_to_pipeline(
×
158
            &table_operator,
×
159
            pipeline,
×
160
            pipeline_idx,
×
161
            query_context,
×
162
        )?;
×
163
        JoinSource::Operator(connection_info)
×
164
    } else if is_nested_join(&source) {
535✔
165
        return Err(PipelineError::InvalidJoin(
×
166
            "Nested JOINs are not supported".to_string(),
×
167
        ));
×
168
    } else {
×
169
        let name_or_alias = get_from_source(&source, pipeline, query_context, pipeline_idx)?;
535✔
170
        JoinSource::Table(name_or_alias.0)
535✔
171
    };
×
172
    Ok(join_source)
535✔
173
}
535✔
174

×
175
fn insert_table_operator_to_pipeline(
×
176
    table_operator: &TableOperatorDescriptor,
×
177
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
178
    pipeline_idx: usize,
×
179
    query_context: &mut QueryContext,
×
180
) -> Result<ConnectionInfo, PipelineError> {
×
181
    let mut input_nodes = vec![];
×
182

×
183
    if table_operator.name.to_uppercase() == "TTL" {
×
184
        let processor_name = format!(
×
185
            "TOP_{0}_{1}",
×
186
            table_operator.name,
×
187
            query_context.get_next_processor_id()
×
188
        );
×
189
        let processor =
×
190
            TableOperatorProcessorFactory::new(processor_name.clone(), table_operator.clone());
×
191

×
192
        let source_name = processor
×
193
            .get_source_name()
×
194
            .map_err(PipelineError::TableOperatorError)?;
×
195

×
196
        let mut entry_points = vec![];
×
197

×
198
        if is_an_entry_point(&source_name, &mut query_context.pipeline_map, pipeline_idx) {
×
199
            let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE);
×
200

×
201
            entry_points.push(entry_point);
×
202
            query_context.used_sources.push(source_name);
×
203
        } else {
×
204
            input_nodes.push((source_name, processor_name.clone(), DEFAULT_PORT_HANDLE));
×
205
        }
×
206

×
207
        pipeline.add_processor(Box::new(processor), &processor_name, entry_points);
×
208

×
209
        Ok(ConnectionInfo {
×
210
            input_nodes,
×
211
            output_node: (processor_name, DEFAULT_PORT_HANDLE),
×
212
        })
×
213
    } else if table_operator.name.to_uppercase() == "TUMBLE"
×
214
        || table_operator.name.to_uppercase() == "HOP"
×
215
    {
216
        // for now, we only support window operators
×
217
        let window_processor_name = format!("window_{}", query_context.get_next_processor_id());
×
218
        let window_processor_factory =
×
219
            WindowProcessorFactory::new(window_processor_name.clone(), table_operator.clone());
×
220
        let window_source_name = window_processor_factory.get_source_name()?;
×
221
        let mut window_entry_points = vec![];
×
222

×
223
        if is_an_entry_point(
×
224
            &window_source_name,
×
225
            &mut query_context.pipeline_map,
×
226
            pipeline_idx,
×
227
        ) {
×
228
            let entry_point =
×
229
                PipelineEntryPoint::new(window_source_name.clone(), DEFAULT_PORT_HANDLE);
×
230

×
231
            window_entry_points.push(entry_point);
×
232
            query_context.used_sources.push(window_source_name);
×
233
        } else {
×
234
            input_nodes.push((
×
235
                window_source_name,
×
236
                window_processor_name.clone(),
×
237
                DEFAULT_PORT_HANDLE,
×
238
            ));
×
239
        }
×
240

×
241
        pipeline.add_processor(
×
242
            Box::new(window_processor_factory),
×
243
            &window_processor_name,
×
244
            window_entry_points,
×
245
        );
×
246

×
247
        Ok(ConnectionInfo {
×
248
            input_nodes,
×
249
            output_node: (window_processor_name, DEFAULT_PORT_HANDLE),
×
250
        })
×
251
    } else {
×
252
        Err(PipelineError::UnsupportedTableOperator(
×
253
            table_operator.name.clone(),
×
254
        ))
×
255
    }
×
256
}
×
257

×
258
fn is_nested_join(left_table: &sqlparser::ast::TableFactor) -> bool {
535✔
259
    matches!(left_table, sqlparser::ast::TableFactor::NestedJoin { .. })
535✔
260
}
535✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc