• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5607543735

pending completion
5607543735

push

github

web-flow
fix: Show warning when metadata collect failed (#1772)

6 of 6 new or added lines in 1 file covered. (100.0%)

43000 of 56395 relevant lines covered (76.25%)

33899.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.06
/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs
1
use std::sync::Arc;
2

3
use dozer_core::{
4
    app::{AppPipeline, PipelineEntryPoint},
5
    appsource::AppSourceId,
6
    DEFAULT_PORT_HANDLE,
7
};
8
use sqlparser::ast::TableWithJoins;
9

10
use crate::pipeline::{
11
    builder::{get_from_source, QueryContext, SchemaSQLContext},
12
    errors::PipelineError,
13
    product::{
14
        join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT},
15
        table::factory::get_name_or_alias,
16
    },
17
    table_operator::factory::TableOperatorProcessorFactory,
18
    window::factory::WindowProcessorFactory,
19
};
20

21
use super::from_builder::{
22
    is_an_entry_point, is_table_operator, ConnectionInfo, TableOperatorDescriptor,
23
};
24

25
#[derive(Clone, Debug)]
175✔
26
enum JoinSource {
27
    Table(String),
28
    Operator(ConnectionInfo),
29
    Join(ConnectionInfo),
30
}
31

32
pub(crate) fn insert_join_to_pipeline(
131✔
33
    from: &TableWithJoins,
131✔
34
    pipeline: &mut AppPipeline<SchemaSQLContext>,
131✔
35
    pipeline_idx: usize,
131✔
36
    query_context: &mut QueryContext,
131✔
37
) -> Result<ConnectionInfo, PipelineError> {
131✔
38
    let mut input_nodes = vec![];
131✔
39

131✔
40
    let left_table = &from.relation;
131✔
41
    let mut left_name_or_alias = Some(get_name_or_alias(left_table)?);
131✔
42
    let mut left_join_source =
131✔
43
        insert_join_source_to_pipeline(left_table.clone(), pipeline, pipeline_idx, query_context)?;
131✔
44

45
    for join in &from.joins {
310✔
46
        let right_table = &join.relation;
179✔
47
        let right_name_or_alias = Some(get_name_or_alias(right_table)?);
179✔
48
        let right_join_source = insert_join_source_to_pipeline(
179✔
49
            right_table.clone(),
179✔
50
            pipeline,
179✔
51
            pipeline_idx,
179✔
52
            query_context,
179✔
53
        )?;
179✔
54

55
        let join_processor_name = format!("join_{}", query_context.get_next_processor_id());
179✔
56
        let join_processor_factory = JoinProcessorFactory::new(
179✔
57
            join_processor_name.clone(),
179✔
58
            left_name_or_alias.clone(),
179✔
59
            right_name_or_alias,
179✔
60
            join.join_operator.clone(),
179✔
61
        );
179✔
62

179✔
63
        let mut pipeline_entry_points = vec![];
179✔
64
        if let JoinSource::Table(ref source_table) = left_join_source {
179✔
65
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
131✔
66
                let entry_point = PipelineEntryPoint::new(
127✔
67
                    AppSourceId::new(source_table.clone(), None),
127✔
68
                    LEFT_JOIN_PORT,
127✔
69
                );
127✔
70

127✔
71
                pipeline_entry_points.push(entry_point);
127✔
72
                query_context.used_sources.push(source_table.to_string());
127✔
73
            } else {
127✔
74
                input_nodes.push((
4✔
75
                    source_table.to_string(),
4✔
76
                    join_processor_name.clone(),
4✔
77
                    LEFT_JOIN_PORT,
4✔
78
                ));
4✔
79
            }
4✔
80
        }
48✔
81

82
        if let JoinSource::Table(ref source_table) = right_join_source.clone() {
179✔
83
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
179✔
84
                let entry_point = PipelineEntryPoint::new(
170✔
85
                    AppSourceId::new(source_table.clone(), None),
170✔
86
                    RIGHT_JOIN_PORT,
170✔
87
                );
170✔
88

170✔
89
                pipeline_entry_points.push(entry_point);
170✔
90
                query_context.used_sources.push(source_table.to_string());
170✔
91
            } else {
170✔
92
                input_nodes.push((
9✔
93
                    source_table.to_string(),
9✔
94
                    join_processor_name.clone(),
9✔
95
                    RIGHT_JOIN_PORT,
9✔
96
                ));
9✔
97
            }
9✔
98
        }
×
99

100
        pipeline.add_processor(
179✔
101
            Arc::new(join_processor_factory),
179✔
102
            &join_processor_name,
179✔
103
            pipeline_entry_points,
179✔
104
        );
179✔
105

179✔
106
        match left_join_source {
179✔
107
            JoinSource::Table(_) => {}
131✔
108
            JoinSource::Operator(ref connection_info) => pipeline.connect_nodes(
×
109
                &connection_info.output_node.0,
×
110
                Some(connection_info.output_node.1),
×
111
                &join_processor_name,
×
112
                Some(LEFT_JOIN_PORT),
×
113
                true,
×
114
            ),
×
115
            JoinSource::Join(ref connection_info) => pipeline.connect_nodes(
48✔
116
                &connection_info.output_node.0,
48✔
117
                Some(connection_info.output_node.1),
48✔
118
                &join_processor_name,
48✔
119
                Some(LEFT_JOIN_PORT),
48✔
120
                true,
48✔
121
            ),
48✔
122
        }
123

124
        match right_join_source {
179✔
125
            JoinSource::Table(_) => {}
179✔
126
            JoinSource::Operator(connection_info) => pipeline.connect_nodes(
×
127
                &connection_info.output_node.0,
×
128
                Some(connection_info.output_node.1),
×
129
                &join_processor_name,
×
130
                Some(RIGHT_JOIN_PORT),
×
131
                true,
×
132
            ),
×
133
            JoinSource::Join(connection_info) => pipeline.connect_nodes(
×
134
                &connection_info.output_node.0,
×
135
                Some(connection_info.output_node.1),
×
136
                &join_processor_name,
×
137
                Some(RIGHT_JOIN_PORT),
×
138
                true,
×
139
            ),
×
140
        }
141

142
        // TODO: refactor join source name and aliasing logic
143
        left_name_or_alias = None;
179✔
144
        left_join_source = JoinSource::Join(ConnectionInfo {
179✔
145
            input_nodes: input_nodes.clone(),
179✔
146
            output_node: (join_processor_name, DEFAULT_PORT_HANDLE),
179✔
147
        });
179✔
148
    }
149

150
    match left_join_source {
131✔
151
        JoinSource::Table(_) => Err(PipelineError::InvalidJoin(
×
152
            "No JOIN operator found".to_string(),
×
153
        )),
×
154
        JoinSource::Operator(_) => Err(PipelineError::InvalidJoin(
×
155
            "No JOIN operator found".to_string(),
×
156
        )),
×
157
        JoinSource::Join(connection_info) => Ok(connection_info),
131✔
158
    }
159
}
131✔
160

161
// TODO: refactor this
162
fn insert_join_source_to_pipeline(
310✔
163
    source: sqlparser::ast::TableFactor,
310✔
164
    pipeline: &mut AppPipeline<SchemaSQLContext>,
310✔
165
    pipeline_idx: usize,
310✔
166
    query_context: &mut QueryContext,
310✔
167
) -> Result<JoinSource, PipelineError> {
310✔
168
    let join_source = if let Some(table_operator) = is_table_operator(&source)? {
310✔
169
        let connection_info = insert_table_operator_to_pipeline(
×
170
            &table_operator,
×
171
            pipeline,
×
172
            pipeline_idx,
×
173
            query_context,
×
174
        )?;
×
175
        JoinSource::Operator(connection_info)
×
176
    } else if is_nested_join(&source) {
310✔
177
        return Err(PipelineError::InvalidJoin(
×
178
            "Nested JOINs are not supported".to_string(),
×
179
        ));
×
180
    } else {
181
        let name_or_alias = get_from_source(&source, pipeline, query_context, pipeline_idx)?;
310✔
182
        JoinSource::Table(name_or_alias.0)
310✔
183
    };
184
    Ok(join_source)
310✔
185
}
310✔
186

187
fn insert_table_operator_to_pipeline(
×
188
    table_operator: &TableOperatorDescriptor,
×
189
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
190
    pipeline_idx: usize,
×
191
    query_context: &mut QueryContext,
×
192
) -> Result<ConnectionInfo, PipelineError> {
×
193
    let mut input_nodes = vec![];
×
194

×
195
    if table_operator.name.to_uppercase() == "TTL" {
×
196
        let processor_name = format!(
×
197
            "TOP_{0}_{1}",
×
198
            table_operator.name,
×
199
            query_context.get_next_processor_id()
×
200
        );
×
201
        let processor =
×
202
            TableOperatorProcessorFactory::new(processor_name.clone(), table_operator.clone());
×
203

204
        let source_name = processor
×
205
            .get_source_name()
×
206
            .map_err(PipelineError::TableOperatorError)?;
×
207

208
        let mut entry_points = vec![];
×
209

×
210
        if is_an_entry_point(&source_name, &mut query_context.pipeline_map, pipeline_idx) {
×
211
            let entry_point = PipelineEntryPoint::new(
×
212
                AppSourceId::new(source_name.clone(), None),
×
213
                DEFAULT_PORT_HANDLE,
×
214
            );
×
215

×
216
            entry_points.push(entry_point);
×
217
            query_context.used_sources.push(source_name);
×
218
        } else {
×
219
            input_nodes.push((source_name, processor_name.clone(), DEFAULT_PORT_HANDLE));
×
220
        }
×
221

222
        pipeline.add_processor(Arc::new(processor), &processor_name, entry_points);
×
223

×
224
        Ok(ConnectionInfo {
×
225
            input_nodes,
×
226
            output_node: (processor_name, DEFAULT_PORT_HANDLE),
×
227
        })
×
228
    } else if table_operator.name.to_uppercase() == "TUMBLE"
×
229
        || table_operator.name.to_uppercase() == "HOP"
×
230
    {
231
        // for now, we only support window operators
232
        let window_processor_name = format!("window_{}", query_context.get_next_processor_id());
×
233
        let window_processor_factory =
×
234
            WindowProcessorFactory::new(window_processor_name.clone(), table_operator.clone());
×
235
        let window_source_name = window_processor_factory.get_source_name()?;
×
236
        let mut window_entry_points = vec![];
×
237

×
238
        if is_an_entry_point(
×
239
            &window_source_name,
×
240
            &mut query_context.pipeline_map,
×
241
            pipeline_idx,
×
242
        ) {
×
243
            let entry_point = PipelineEntryPoint::new(
×
244
                AppSourceId::new(window_source_name.clone(), None),
×
245
                DEFAULT_PORT_HANDLE,
×
246
            );
×
247

×
248
            window_entry_points.push(entry_point);
×
249
            query_context.used_sources.push(window_source_name);
×
250
        } else {
×
251
            input_nodes.push((
×
252
                window_source_name,
×
253
                window_processor_name.clone(),
×
254
                DEFAULT_PORT_HANDLE,
×
255
            ));
×
256
        }
×
257

258
        pipeline.add_processor(
×
259
            Arc::new(window_processor_factory),
×
260
            &window_processor_name,
×
261
            window_entry_points,
×
262
        );
×
263

×
264
        Ok(ConnectionInfo {
×
265
            input_nodes,
×
266
            output_node: (window_processor_name, DEFAULT_PORT_HANDLE),
×
267
        })
×
268
    } else {
269
        Err(PipelineError::UnsupportedTableOperator(
×
270
            table_operator.name.clone(),
×
271
        ))
×
272
    }
273
}
×
274

275
fn is_nested_join(left_table: &sqlparser::ast::TableFactor) -> bool {
310✔
276
    matches!(left_table, sqlparser::ast::TableFactor::NestedJoin { .. })
310✔
277
}
310✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc