• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763381855

pending completion
4763381855

Pull #1461

github

GitHub
Merge 50bf72be2 into c58df4a0b
Pull Request #1461: feat: Make secondary index configurable

135 of 135 new or added lines in 6 files covered. (100.0%)

34877 of 44466 relevant lines covered (78.44%)

11367.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.4
/dozer-sql/src/pipeline/pipeline_builder/join_builder.rs
1
use std::sync::Arc;
2

3
use dozer_core::{
4
    app::{AppPipeline, PipelineEntryPoint},
5
    appsource::AppSourceId,
6
    DEFAULT_PORT_HANDLE,
7
};
8
use sqlparser::ast::TableWithJoins;
9

10
use crate::pipeline::{
11
    builder::{get_from_source, QueryContext, SchemaSQLContext},
12
    errors::PipelineError,
13
    product::{
14
        join::factory::{JoinProcessorFactory, LEFT_JOIN_PORT, RIGHT_JOIN_PORT},
15
        table::factory::get_name_or_alias,
16
    },
17
    window::factory::WindowProcessorFactory,
18
};
19

20
use super::from_builder::{is_an_entry_point, is_table_operator, ConnectionInfo, TableOperator};
21

22
#[derive(Clone, Debug)]
222✔
23
enum JoinSource {
24
    Table(String),
25
    Operator(ConnectionInfo),
26
    Join(ConnectionInfo),
27
}
28

29
pub(crate) fn insert_join_to_pipeline(
163✔
30
    from: &TableWithJoins,
163✔
31
    pipeline: &mut AppPipeline<SchemaSQLContext>,
163✔
32
    pipeline_idx: usize,
163✔
33
    query_context: &mut QueryContext,
163✔
34
) -> Result<ConnectionInfo, PipelineError> {
163✔
35
    let mut input_nodes = vec![];
163✔
36

163✔
37
    let left_table = &from.relation;
163✔
38
    let mut left_name_or_alias = Some(get_name_or_alias(left_table)?);
163✔
39
    let mut left_join_source =
163✔
40
        insert_join_source_to_pipeline(left_table.clone(), pipeline, pipeline_idx, query_context)?;
163✔
41

42
    for join in &from.joins {
385✔
43
        let right_table = &join.relation;
222✔
44
        let right_name_or_alias = Some(get_name_or_alias(right_table)?);
222✔
45
        let right_join_source = insert_join_source_to_pipeline(
222✔
46
            right_table.clone(),
222✔
47
            pipeline,
222✔
48
            pipeline_idx,
222✔
49
            query_context,
222✔
50
        )?;
222✔
51

52
        let join_processor_factory = JoinProcessorFactory::new(
222✔
53
            left_name_or_alias.clone(),
222✔
54
            right_name_or_alias,
222✔
55
            join.join_operator.clone(),
222✔
56
        );
222✔
57
        let join_processor_name = format!("join_{}", uuid::Uuid::new_v4());
222✔
58

222✔
59
        let mut pipeline_entry_points = vec![];
222✔
60
        if let JoinSource::Table(ref source_table) = left_join_source {
222✔
61
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
163✔
62
                let entry_point = PipelineEntryPoint::new(
158✔
63
                    AppSourceId::new(source_table.clone(), None),
158✔
64
                    LEFT_JOIN_PORT,
158✔
65
                );
158✔
66

158✔
67
                pipeline_entry_points.push(entry_point);
158✔
68
                query_context.used_sources.push(source_table.to_string());
158✔
69
            } else {
158✔
70
                input_nodes.push((
5✔
71
                    source_table.to_string(),
5✔
72
                    join_processor_name.clone(),
5✔
73
                    LEFT_JOIN_PORT,
5✔
74
                ));
5✔
75
            }
5✔
76
        }
59✔
77

78
        if let JoinSource::Table(ref source_table) = right_join_source.clone() {
222✔
79
            if is_an_entry_point(source_table, &mut query_context.pipeline_map, pipeline_idx) {
222✔
80
                let entry_point = PipelineEntryPoint::new(
211✔
81
                    AppSourceId::new(source_table.clone(), None),
211✔
82
                    RIGHT_JOIN_PORT,
211✔
83
                );
211✔
84

211✔
85
                pipeline_entry_points.push(entry_point);
211✔
86
                query_context.used_sources.push(source_table.to_string());
211✔
87
            } else {
211✔
88
                input_nodes.push((
11✔
89
                    source_table.to_string(),
11✔
90
                    join_processor_name.clone(),
11✔
91
                    RIGHT_JOIN_PORT,
11✔
92
                ));
11✔
93
            }
11✔
94
        }
×
95

96
        pipeline.add_processor(
222✔
97
            Arc::new(join_processor_factory),
222✔
98
            &join_processor_name,
222✔
99
            pipeline_entry_points,
222✔
100
        );
222✔
101

222✔
102
        match left_join_source {
222✔
103
            JoinSource::Table(_) => {}
163✔
104
            JoinSource::Operator(ref connection_info) => pipeline
×
105
                .connect_nodes(
×
106
                    &connection_info.output_node.0,
×
107
                    Some(connection_info.output_node.1),
×
108
                    &join_processor_name,
×
109
                    Some(LEFT_JOIN_PORT),
×
110
                    true,
×
111
                )
×
112
                .map_err(PipelineError::InternalExecutionError)?,
×
113
            JoinSource::Join(ref connection_info) => pipeline
59✔
114
                .connect_nodes(
59✔
115
                    &connection_info.output_node.0,
59✔
116
                    Some(connection_info.output_node.1),
59✔
117
                    &join_processor_name,
59✔
118
                    Some(LEFT_JOIN_PORT),
59✔
119
                    true,
59✔
120
                )
59✔
121
                .map_err(PipelineError::InternalExecutionError)?,
59✔
122
        }
123

124
        match right_join_source {
222✔
125
            JoinSource::Table(_) => {}
222✔
126
            JoinSource::Operator(connection_info) => pipeline
×
127
                .connect_nodes(
×
128
                    &connection_info.output_node.0,
×
129
                    Some(connection_info.output_node.1),
×
130
                    &join_processor_name,
×
131
                    Some(RIGHT_JOIN_PORT),
×
132
                    true,
×
133
                )
×
134
                .map_err(PipelineError::InternalExecutionError)?,
×
135
            JoinSource::Join(connection_info) => pipeline
×
136
                .connect_nodes(
×
137
                    &connection_info.output_node.0,
×
138
                    Some(connection_info.output_node.1),
×
139
                    &join_processor_name,
×
140
                    Some(RIGHT_JOIN_PORT),
×
141
                    true,
×
142
                )
×
143
                .map_err(PipelineError::InternalExecutionError)?,
×
144
        }
145

146
        // TODO: refactor join source name and aliasing logic
147
        left_name_or_alias = None;
222✔
148
        left_join_source = JoinSource::Join(ConnectionInfo {
222✔
149
            input_nodes: input_nodes.clone(),
222✔
150
            output_node: (join_processor_name, DEFAULT_PORT_HANDLE),
222✔
151
        });
222✔
152
    }
153

154
    match left_join_source {
163✔
155
        JoinSource::Table(_) => Err(PipelineError::InvalidJoin(
×
156
            "No JOIN operator found".to_string(),
×
157
        )),
×
158
        JoinSource::Operator(_) => Err(PipelineError::InvalidJoin(
×
159
            "No JOIN operator found".to_string(),
×
160
        )),
×
161
        JoinSource::Join(connection_info) => Ok(connection_info),
163✔
162
    }
163
}
163✔
164

165
// TODO: refactor this
166
fn insert_join_source_to_pipeline(
385✔
167
    source: sqlparser::ast::TableFactor,
385✔
168
    pipeline: &mut AppPipeline<SchemaSQLContext>,
385✔
169
    pipeline_idx: usize,
385✔
170
    query_context: &mut QueryContext,
385✔
171
) -> Result<JoinSource, PipelineError> {
385✔
172
    let join_source = if let Some(table_operator) = is_table_operator(&source)? {
385✔
173
        let connection_info = insert_table_operator_to_pipeline(
×
174
            &table_operator,
×
175
            pipeline,
×
176
            pipeline_idx,
×
177
            query_context,
×
178
        )?;
×
179
        JoinSource::Operator(connection_info)
×
180
    } else if is_nested_join(&source) {
385✔
181
        return Err(PipelineError::InvalidJoin(
×
182
            "Nested JOINs are not supported".to_string(),
×
183
        ));
×
184
    } else {
185
        let name_or_alias = get_from_source(&source, pipeline, query_context, pipeline_idx)?;
385✔
186
        JoinSource::Table(name_or_alias.0)
385✔
187
    };
188
    Ok(join_source)
385✔
189
}
385✔
190

191
fn insert_table_operator_to_pipeline(
×
192
    table_operator: &TableOperator,
×
193
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
194
    pipeline_idx: usize,
×
195
    query_context: &mut QueryContext,
×
196
) -> Result<ConnectionInfo, PipelineError> {
×
197
    let mut input_nodes = vec![];
×
198

×
199
    // for now, we only support window operators
×
200
    let window_processor_factory = WindowProcessorFactory::new(table_operator.clone());
×
201
    let window_processor_name = format!("window_{}", uuid::Uuid::new_v4());
×
202
    let window_source_name = window_processor_factory.get_source_name()?;
×
203
    let mut window_entry_points = vec![];
×
204

×
205
    if is_an_entry_point(
×
206
        &window_source_name,
×
207
        &mut query_context.pipeline_map,
×
208
        pipeline_idx,
×
209
    ) {
×
210
        let entry_point = PipelineEntryPoint::new(
×
211
            AppSourceId::new(window_source_name.clone(), None),
×
212
            DEFAULT_PORT_HANDLE,
×
213
        );
×
214

×
215
        window_entry_points.push(entry_point);
×
216
        query_context.used_sources.push(window_source_name);
×
217
    } else {
×
218
        input_nodes.push((
×
219
            window_source_name,
×
220
            window_processor_name.clone(),
×
221
            DEFAULT_PORT_HANDLE,
×
222
        ));
×
223
    }
×
224

225
    pipeline.add_processor(
×
226
        Arc::new(window_processor_factory),
×
227
        &window_processor_name,
×
228
        window_entry_points,
×
229
    );
×
230

×
231
    Ok(ConnectionInfo {
×
232
        input_nodes,
×
233
        output_node: (window_processor_name, DEFAULT_PORT_HANDLE),
×
234
    })
×
235
}
×
236

237
fn is_nested_join(left_table: &sqlparser::ast::TableFactor) -> bool {
385✔
238
    matches!(left_table, sqlparser::ast::TableFactor::NestedJoin { .. })
385✔
239
}
385✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc