• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5709656380

pending completion
5709656380

push

github

web-flow
Version bump (#1808)

45512 of 59772 relevant lines covered (76.14%)

39312.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.27
/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs
1
use dozer_core::{
2
    app::{AppPipeline, PipelineEntryPoint},
3
    DEFAULT_PORT_HANDLE,
4
};
5
use sqlparser::ast::TableWithJoins;
6

7
use crate::pipeline::{
8
    builder::{get_from_source, QueryContext, SchemaSQLContext},
9
    errors::PipelineError,
10
    product::{
11
        join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT},
12
        table::factory::get_name_or_alias,
13
    },
14
    table_operator::factory::TableOperatorProcessorFactory,
15
    window::factory::WindowProcessorFactory,
16
};
17

18
use super::from_builder::{
19
    is_an_entry_point, is_table_operator, ConnectionInfo, TableOperatorDescriptor,
20
};
21

22
#[derive(Clone, Debug)]
179✔
23
enum JoinSource {
24
    Table(String),
25
    Operator(ConnectionInfo),
×
26
    Join(ConnectionInfo),
27
}
28

29
pub(crate) fn insert_join_to_pipeline(
131✔
30
    from: &TableWithJoins,
131✔
31
    pipeline: &mut AppPipeline<SchemaSQLContext>,
131✔
32
    pipeline_idx: usize,
131✔
33
    query_context: &mut QueryContext,
131✔
34
) -> Result<ConnectionInfo, PipelineError> {
131✔
35
    let mut input_nodes = vec![];
131✔
36

131✔
37
    let left_table = &from.relation;
131✔
38
    let mut left_name_or_alias = Some(get_name_or_alias(left_table)?);
131✔
39
    let mut left_join_source =
131✔
40
        insert_join_source_to_pipeline(left_table.clone(), pipeline, pipeline_idx, query_context)?;
131✔
41

×
42
    for join in &from.joins {
310✔
43
        let right_table = &join.relation;
179✔
44
        let right_name_or_alias = Some(get_name_or_alias(right_table)?);
179✔
45
        let right_join_source = insert_join_source_to_pipeline(
179✔
46
            right_table.clone(),
179✔
47
            pipeline,
179✔
48
            pipeline_idx,
179✔
49
            query_context,
179✔
50
        )?;
179✔
51

×
52
        let join_processor_name = format!("join_{}", query_context.get_next_processor_id());
179✔
53
        let join_processor_factory = JoinProcessorFactory::new(
179✔
54
            join_processor_name.clone(),
179✔
55
            left_name_or_alias.clone(),
179✔
56
            right_name_or_alias,
179✔
57
            join.join_operator.clone(),
179✔
58
        );
179✔
59

179✔
60
        let mut pipeline_entry_points = vec![];
179✔
61
        if let JoinSource::Table(ref source_table) = left_join_source {
179✔
62
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
131✔
63
                let entry_point = PipelineEntryPoint::new(source_table.clone(), LEFT_JOIN_PORT);
127✔
64

127✔
65
                pipeline_entry_points.push(entry_point);
127✔
66
                query_context.used_sources.push(source_table.to_string());
127✔
67
            } else {
127✔
68
                input_nodes.push((
4✔
69
                    source_table.to_string(),
4✔
70
                    join_processor_name.clone(),
4✔
71
                    LEFT_JOIN_PORT,
4✔
72
                ));
4✔
73
            }
4✔
74
        }
48✔
75

×
76
        if let JoinSource::Table(ref source_table) = right_join_source.clone() {
179✔
77
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
179✔
78
                let entry_point = PipelineEntryPoint::new(source_table.clone(), RIGHT_JOIN_PORT);
170✔
79

170✔
80
                pipeline_entry_points.push(entry_point);
170✔
81
                query_context.used_sources.push(source_table.to_string());
170✔
82
            } else {
170✔
83
                input_nodes.push((
9✔
84
                    source_table.to_string(),
9✔
85
                    join_processor_name.clone(),
9✔
86
                    RIGHT_JOIN_PORT,
9✔
87
                ));
9✔
88
            }
9✔
89
        }
×
90

×
91
        pipeline.add_processor(
179✔
92
            Box::new(join_processor_factory),
179✔
93
            &join_processor_name,
179✔
94
            pipeline_entry_points,
179✔
95
        );
179✔
96

179✔
97
        match left_join_source {
179✔
98
            JoinSource::Table(_) => {}
131✔
99
            JoinSource::Operator(ref connection_info) => pipeline.connect_nodes(
×
100
                &connection_info.output_node.0,
×
101
                connection_info.output_node.1,
×
102
                &join_processor_name,
×
103
                LEFT_JOIN_PORT,
×
104
            ),
×
105
            JoinSource::Join(ref connection_info) => pipeline.connect_nodes(
48✔
106
                &connection_info.output_node.0,
48✔
107
                connection_info.output_node.1,
48✔
108
                &join_processor_name,
48✔
109
                LEFT_JOIN_PORT,
48✔
110
            ),
48✔
111
        }
×
112

×
113
        match right_join_source {
179✔
114
            JoinSource::Table(_) => {}
179✔
115
            JoinSource::Operator(connection_info) => pipeline.connect_nodes(
×
116
                &connection_info.output_node.0,
×
117
                connection_info.output_node.1,
×
118
                &join_processor_name,
×
119
                RIGHT_JOIN_PORT,
×
120
            ),
×
121
            JoinSource::Join(connection_info) => pipeline.connect_nodes(
×
122
                &connection_info.output_node.0,
×
123
                connection_info.output_node.1,
×
124
                &join_processor_name,
×
125
                RIGHT_JOIN_PORT,
×
126
            ),
×
127
        }
×
128

×
129
        // TODO: refactor join source name and aliasing logic
×
130
        left_name_or_alias = None;
179✔
131
        left_join_source = JoinSource::Join(ConnectionInfo {
179✔
132
            input_nodes: input_nodes.clone(),
179✔
133
            output_node: (join_processor_name, DEFAULT_PORT_HANDLE),
179✔
134
        });
179✔
135
    }
×
136

×
137
    match left_join_source {
131✔
138
        JoinSource::Table(_) => Err(PipelineError::InvalidJoin(
×
139
            "No JOIN operator found".to_string(),
×
140
        )),
×
141
        JoinSource::Operator(_) => Err(PipelineError::InvalidJoin(
×
142
            "No JOIN operator found".to_string(),
×
143
        )),
×
144
        JoinSource::Join(connection_info) => Ok(connection_info),
131✔
145
    }
×
146
}
131✔
147

×
148
// TODO: refactor this
149
fn insert_join_source_to_pipeline(
310✔
150
    source: sqlparser::ast::TableFactor,
310✔
151
    pipeline: &mut AppPipeline<SchemaSQLContext>,
310✔
152
    pipeline_idx: usize,
310✔
153
    query_context: &mut QueryContext,
310✔
154
) -> Result<JoinSource, PipelineError> {
310✔
155
    let join_source = if let Some(table_operator) = is_table_operator(&source)? {
310✔
156
        let connection_info = insert_table_operator_to_pipeline(
×
157
            &table_operator,
×
158
            pipeline,
×
159
            pipeline_idx,
×
160
            query_context,
×
161
        )?;
×
162
        JoinSource::Operator(connection_info)
×
163
    } else if is_nested_join(&source) {
310✔
164
        return Err(PipelineError::InvalidJoin(
×
165
            "Nested JOINs are not supported".to_string(),
×
166
        ));
×
167
    } else {
×
168
        let name_or_alias = get_from_source(&source, pipeline, query_context, pipeline_idx)?;
310✔
169
        JoinSource::Table(name_or_alias.0)
310✔
170
    };
×
171
    Ok(join_source)
310✔
172
}
310✔
173

×
174
fn insert_table_operator_to_pipeline(
×
175
    table_operator: &TableOperatorDescriptor,
×
176
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
177
    pipeline_idx: usize,
×
178
    query_context: &mut QueryContext,
×
179
) -> Result<ConnectionInfo, PipelineError> {
×
180
    let mut input_nodes = vec![];
×
181

×
182
    if table_operator.name.to_uppercase() == "TTL" {
×
183
        let processor_name = format!(
×
184
            "TOP_{0}_{1}",
×
185
            table_operator.name,
×
186
            query_context.get_next_processor_id()
×
187
        );
×
188
        let processor =
×
189
            TableOperatorProcessorFactory::new(processor_name.clone(), table_operator.clone());
×
190

×
191
        let source_name = processor
×
192
            .get_source_name()
×
193
            .map_err(PipelineError::TableOperatorError)?;
×
194

×
195
        let mut entry_points = vec![];
×
196

×
197
        if is_an_entry_point(&source_name, &mut query_context.pipeline_map, pipeline_idx) {
×
198
            let entry_point = PipelineEntryPoint::new(source_name.clone(), DEFAULT_PORT_HANDLE);
×
199

×
200
            entry_points.push(entry_point);
×
201
            query_context.used_sources.push(source_name);
×
202
        } else {
×
203
            input_nodes.push((source_name, processor_name.clone(), DEFAULT_PORT_HANDLE));
×
204
        }
×
205

×
206
        pipeline.add_processor(Box::new(processor), &processor_name, entry_points);
×
207

×
208
        Ok(ConnectionInfo {
×
209
            input_nodes,
×
210
            output_node: (processor_name, DEFAULT_PORT_HANDLE),
×
211
        })
×
212
    } else if table_operator.name.to_uppercase() == "TUMBLE"
×
213
        || table_operator.name.to_uppercase() == "HOP"
×
214
    {
×
215
        // for now, we only support window operators
×
216
        let window_processor_name = format!("window_{}", query_context.get_next_processor_id());
×
217
        let window_processor_factory =
×
218
            WindowProcessorFactory::new(window_processor_name.clone(), table_operator.clone());
×
219
        let window_source_name = window_processor_factory.get_source_name()?;
×
220
        let mut window_entry_points = vec![];
×
221

×
222
        if is_an_entry_point(
×
223
            &window_source_name,
×
224
            &mut query_context.pipeline_map,
×
225
            pipeline_idx,
×
226
        ) {
×
227
            let entry_point =
×
228
                PipelineEntryPoint::new(window_source_name.clone(), DEFAULT_PORT_HANDLE);
×
229

×
230
            window_entry_points.push(entry_point);
×
231
            query_context.used_sources.push(window_source_name);
×
232
        } else {
×
233
            input_nodes.push((
×
234
                window_source_name,
×
235
                window_processor_name.clone(),
×
236
                DEFAULT_PORT_HANDLE,
×
237
            ));
×
238
        }
×
239

×
240
        pipeline.add_processor(
×
241
            Box::new(window_processor_factory),
×
242
            &window_processor_name,
×
243
            window_entry_points,
×
244
        );
×
245

×
246
        Ok(ConnectionInfo {
×
247
            input_nodes,
×
248
            output_node: (window_processor_name, DEFAULT_PORT_HANDLE),
×
249
        })
×
250
    } else {
×
251
        Err(PipelineError::UnsupportedTableOperator(
×
252
            table_operator.name.clone(),
×
253
        ))
×
254
    }
×
255
}
×
256

×
257
fn is_nested_join(left_table: &sqlparser::ast::TableFactor) -> bool {
310✔
258
    matches!(left_table, sqlparser::ast::TableFactor::NestedJoin { .. })
310✔
259
}
310✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc