• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5630015928

pending completion
5630015928

push

github

web-flow
Bump version (#1779)

42841 of 55898 relevant lines covered (76.64%)

32850.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.34
/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs
1
use std::sync::Arc;
2

3
use dozer_core::{
4
    app::{AppPipeline, PipelineEntryPoint},
5
    appsource::AppSourceId,
6
    DEFAULT_PORT_HANDLE,
7
};
8
use sqlparser::ast::TableWithJoins;
9

10
use crate::pipeline::{
11
    builder::{get_from_source, QueryContext, SchemaSQLContext},
12
    errors::PipelineError,
13
    product::{
14
        join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT},
15
        table::factory::get_name_or_alias,
16
    },
17
    table_operator::factory::TableOperatorProcessorFactory,
18
    window::factory::WindowProcessorFactory,
19
};
20

21
use super::from_builder::{
22
    is_an_entry_point, is_table_operator, ConnectionInfo, TableOperatorDescriptor,
23
};
24

25
#[derive(Clone, Debug)]
175✔
26
enum JoinSource {
27
    Table(String),
28
    Operator(ConnectionInfo),
29
    Join(ConnectionInfo),
30
}
31

32
pub(crate) fn insert_join_to_pipeline(
127✔
33
    from: &TableWithJoins,
127✔
34
    pipeline: &mut AppPipeline<SchemaSQLContext>,
127✔
35
    pipeline_idx: usize,
127✔
36
    query_context: &mut QueryContext,
127✔
37
) -> Result<ConnectionInfo, PipelineError> {
127✔
38
    let mut input_nodes = vec![];
127✔
39

127✔
40
    let left_table = &from.relation;
127✔
41
    let mut left_name_or_alias = Some(get_name_or_alias(left_table)?);
127✔
42
    let mut left_join_source =
127✔
43
        insert_join_source_to_pipeline(left_table.clone(), pipeline, pipeline_idx, query_context)?;
127✔
44

45
    for join in &from.joins {
306✔
46
        let right_table = &join.relation;
175✔
47
        let right_name_or_alias = Some(get_name_or_alias(right_table)?);
175✔
48
        let right_join_source = insert_join_source_to_pipeline(
175✔
49
            right_table.clone(),
175✔
50
            pipeline,
175✔
51
            pipeline_idx,
175✔
52
            query_context,
175✔
53
        )?;
175✔
54

55
        let join_processor_name = format!("join_{}", query_context.get_next_processor_id());
175✔
56
        let join_processor_factory = JoinProcessorFactory::new(
175✔
57
            join_processor_name.clone(),
175✔
58
            left_name_or_alias.clone(),
175✔
59
            right_name_or_alias,
175✔
60
            join.join_operator.clone(),
175✔
61
        );
175✔
62

175✔
63
        let mut pipeline_entry_points = vec![];
175✔
64
        if let JoinSource::Table(ref source_table) = left_join_source {
175✔
65
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
127✔
66
                let entry_point = PipelineEntryPoint::new(
127✔
67
                    AppSourceId::new(source_table.clone(), None),
127✔
68
                    LEFT_JOIN_PORT,
127✔
69
                );
127✔
70

127✔
71
                pipeline_entry_points.push(entry_point);
127✔
72
                query_context.used_sources.push(source_table.to_string());
127✔
73
            } else {
127✔
74
                input_nodes.push((
×
75
                    source_table.to_string(),
×
76
                    join_processor_name.clone(),
×
77
                    LEFT_JOIN_PORT,
×
78
                ));
×
79
            }
×
80
        }
48✔
81

82
        if let JoinSource::Table(ref source_table) = right_join_source.clone() {
175✔
83
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
175✔
84
                let entry_point = PipelineEntryPoint::new(
170✔
85
                    AppSourceId::new(source_table.clone(), None),
170✔
86
                    RIGHT_JOIN_PORT,
170✔
87
                );
170✔
88

170✔
89
                pipeline_entry_points.push(entry_point);
170✔
90
                query_context.used_sources.push(source_table.to_string());
170✔
91
            } else {
170✔
92
                input_nodes.push((
5✔
93
                    source_table.to_string(),
5✔
94
                    join_processor_name.clone(),
5✔
95
                    RIGHT_JOIN_PORT,
5✔
96
                ));
5✔
97
            }
5✔
98
        }
×
99

100
        pipeline.add_processor(
175✔
101
            Arc::new(join_processor_factory),
175✔
102
            &join_processor_name,
175✔
103
            pipeline_entry_points,
175✔
104
        );
175✔
105

175✔
106
        match left_join_source {
175✔
107
            JoinSource::Table(_) => {}
127✔
108
            JoinSource::Operator(ref connection_info) => pipeline.connect_nodes(
×
109
                &connection_info.output_node.0,
×
110
                Some(connection_info.output_node.1),
×
111
                &join_processor_name,
×
112
                Some(LEFT_JOIN_PORT),
×
113
                true,
×
114
            ),
×
115
            JoinSource::Join(ref connection_info) => pipeline.connect_nodes(
48✔
116
                &connection_info.output_node.0,
48✔
117
                Some(connection_info.output_node.1),
48✔
118
                &join_processor_name,
48✔
119
                Some(LEFT_JOIN_PORT),
48✔
120
                true,
48✔
121
            ),
48✔
122
        }
123

124
        match right_join_source {
175✔
125
            JoinSource::Table(_) => {}
179✔
126
            JoinSource::Operator(connection_info) => pipeline.connect_nodes(
×
127
                &connection_info.output_node.0,
×
128
                Some(connection_info.output_node.1),
×
129
                &join_processor_name,
×
130
                Some(RIGHT_JOIN_PORT),
×
131
                true,
×
132
            ),
×
133
            JoinSource::Join(connection_info) => pipeline.connect_nodes(
×
134
                &connection_info.output_node.0,
×
135
                Some(connection_info.output_node.1),
×
136
                &join_processor_name,
×
137
                Some(RIGHT_JOIN_PORT),
×
138
                true,
×
139
            ),
×
140
        }
141

142
        // TODO: refactor join source name and aliasing logic
143
        left_name_or_alias = None;
179✔
144
        left_join_source = JoinSource::Join(ConnectionInfo {
179✔
145
            input_nodes: input_nodes.clone(),
179✔
146
            output_node: (join_processor_name, DEFAULT_PORT_HANDLE),
179✔
147
        });
179✔
148
    }
149

150
    match left_join_source {
131✔
151
        JoinSource::Table(_) => Err(PipelineError::InvalidJoin(
×
152
            "No JOIN operator found".to_string(),
×
153
        )),
×
154
        JoinSource::Operator(_) => Err(PipelineError::InvalidJoin(
×
155
            "No JOIN operator found".to_string(),
×
156
        )),
×
157
        JoinSource::Join(connection_info) => Ok(connection_info),
131✔
158
    }
159
}
131✔
160

161
// TODO: refactor this
162
fn insert_join_source_to_pipeline(
310✔
163
    source: sqlparser::ast::TableFactor,
310✔
164
    pipeline: &mut AppPipeline<SchemaSQLContext>,
310✔
165
    pipeline_idx: usize,
310✔
166
    query_context: &mut QueryContext,
310✔
167
) -> Result<JoinSource, PipelineError> {
310✔
168
    let join_source = if let Some(table_operator) = is_table_operator(&source)? {
310✔
169
        let connection_info = insert_table_operator_to_pipeline(
×
170
            &table_operator,
×
171
            pipeline,
×
172
            pipeline_idx,
×
173
            query_context,
×
174
        )?;
×
175
        JoinSource::Operator(connection_info)
×
176
    } else if is_nested_join(&source) {
310✔
177
        return Err(PipelineError::InvalidJoin(
×
178
            "Nested JOINs are not supported".to_string(),
×
179
        ));
×
180
    } else {
181
        let name_or_alias = get_from_source(&source, pipeline, query_context, pipeline_idx)?;
310✔
182
        JoinSource::Table(name_or_alias.0)
310✔
183
    };
184
    Ok(join_source)
310✔
185
}
310✔
186

187
fn insert_table_operator_to_pipeline(
×
188
    table_operator: &TableOperatorDescriptor,
×
189
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
190
    pipeline_idx: usize,
×
191
    query_context: &mut QueryContext,
×
192
) -> Result<ConnectionInfo, PipelineError> {
×
193
    let mut input_nodes = vec![];
×
194

×
195
    if table_operator.name.to_uppercase() == "TTL" {
×
196
        let processor_name = format!(
×
197
            "TOP_{0}_{1}",
×
198
            table_operator.name,
×
199
            query_context.get_next_processor_id()
×
200
        );
×
201
        let processor =
×
202
            TableOperatorProcessorFactory::new(processor_name.clone(), table_operator.clone());
×
203

204
        let source_name = processor
×
205
            .get_source_name()
×
206
            .map_err(PipelineError::TableOperatorError)?;
×
207

208
        let mut entry_points = vec![];
×
209

×
210
        if is_an_entry_point(&source_name, &mut query_context.pipeline_map, pipeline_idx) {
×
211
            let entry_point = PipelineEntryPoint::new(
×
212
                AppSourceId::new(source_name.clone(), None),
×
213
                DEFAULT_PORT_HANDLE,
×
214
            );
×
215

×
216
            entry_points.push(entry_point);
×
217
            query_context.used_sources.push(source_name);
×
218
        } else {
×
219
            input_nodes.push((source_name, processor_name.clone(), DEFAULT_PORT_HANDLE));
×
220
        }
×
221

222
        pipeline.add_processor(Arc::new(processor), &processor_name, entry_points);
×
223

×
224
        Ok(ConnectionInfo {
×
225
            input_nodes,
×
226
            output_node: (processor_name, DEFAULT_PORT_HANDLE),
×
227
        })
×
228
    } else if table_operator.name.to_uppercase() == "TUMBLE"
×
229
        || table_operator.name.to_uppercase() == "HOP"
×
230
    {
231
        // for now, we only support window operators
232
        let window_processor_name = format!("window_{}", query_context.get_next_processor_id());
×
233
        let window_processor_factory =
×
234
            WindowProcessorFactory::new(window_processor_name.clone(), table_operator.clone());
×
235
        let window_source_name = window_processor_factory.get_source_name()?;
×
236
        let mut window_entry_points = vec![];
×
237

×
238
        if is_an_entry_point(
×
239
            &window_source_name,
×
240
            &mut query_context.pipeline_map,
×
241
            pipeline_idx,
×
242
        ) {
×
243
            let entry_point = PipelineEntryPoint::new(
×
244
                AppSourceId::new(window_source_name.clone(), None),
×
245
                DEFAULT_PORT_HANDLE,
×
246
            );
×
247

×
248
            window_entry_points.push(entry_point);
×
249
            query_context.used_sources.push(window_source_name);
×
250
        } else {
×
251
            input_nodes.push((
×
252
                window_source_name,
×
253
                window_processor_name.clone(),
×
254
                DEFAULT_PORT_HANDLE,
×
255
            ));
×
256
        }
×
257

258
        pipeline.add_processor(
×
259
            Arc::new(window_processor_factory),
×
260
            &window_processor_name,
×
261
            window_entry_points,
×
262
        );
×
263

×
264
        Ok(ConnectionInfo {
×
265
            input_nodes,
×
266
            output_node: (window_processor_name, DEFAULT_PORT_HANDLE),
×
267
        })
×
268
    } else {
269
        Err(PipelineError::UnsupportedTableOperator(
×
270
            table_operator.name.clone(),
×
271
        ))
×
272
    }
273
}
×
274

275
fn is_nested_join(left_table: &sqlparser::ast::TableFactor) -> bool {
310✔
276
    matches!(left_table, sqlparser::ast::TableFactor::NestedJoin { .. })
310✔
277
}
310✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc