• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4370408272

pending completion
4370408272

push

github

GitHub
fix: Fix compilation error introduced in #1158 (#1183)

3 of 3 new or added lines in 1 file covered. (100.0%)

28163 of 39541 relevant lines covered (71.22%)

73625.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.12
/dozer-sql/src/pipeline/window/builder.rs
1
use dozer_types::{
2
    chrono::Duration,
3
    types::{FieldDefinition, Schema},
4
};
5
use sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableFactor, Value};
6

7
use crate::pipeline::{
8
    errors::{JoinError, PipelineError, WindowError},
9
    expression::builder::ExpressionBuilder,
10
};
11

12
use super::operator::WindowType;
13

14
pub(crate) fn window_from_relation(
×
15
    relation: &TableFactor,
×
16
    schema: &Schema,
×
17
) -> Result<Option<WindowType>, WindowError> {
×
18
    match relation {
×
19
        TableFactor::Table { name, args, .. } => {
×
20
            let function_name = string_from_sql_object_name(name);
×
21

×
22
            if let Some(args) = args {
×
23
                if function_name.to_uppercase() == "TUMBLE" {
×
24
                    let column_index = get_window_column_index(args, schema)?;
×
25
                    let interval_arg = args
×
26
                        .get(2)
×
27
                        .ok_or(WindowError::WindowMissingIntervalArgument)?;
×
28
                    let interval = get_window_interval(interval_arg)?;
×
29

×
30
                    Ok(Some(WindowType::Tumble {
×
31
                        column_index,
×
32
                        interval,
×
33
                    }))
×
34
                } else if function_name.to_uppercase() == "HOP" {
×
35
                    let column_index = get_window_column_index(args, schema)?;
×
36
                    let hop_arg = args
×
37
                        .get(2)
×
38
                        .ok_or(WindowError::WindowMissingHopSizeArgument)?;
×
39
                    let hop_size = get_window_hop(hop_arg)?;
×
40
                    let interval_arg = args
×
41
                        .get(3)
×
42
                        .ok_or(WindowError::WindowMissingIntervalArgument)?;
×
43
                    let interval = get_window_interval(interval_arg)?;
×
44

×
45
                    return Ok(Some(WindowType::Hop {
×
46
                        column_index,
×
47
                        hop_size,
×
48
                        interval,
×
49
                    }));
×
50
                } else {
51
                    return Err(WindowError::UnsupportedRelationFunction(function_name));
×
52
                }
×
53
            } else {
×
54
                // not a function, most probably just a relation name
×
55
                Ok(None)
×
56
            }
57
        }
×
58
        TableFactor::Derived {
59
            lateral: _,
60
            subquery: _,
61
            alias: _,
×
62
        } => Ok(None),
×
63
        TableFactor::TableFunction { expr: _, alias: _ } => {
64
            Err(WindowError::UnsupportedTableFunction)
×
65
        }
66
        TableFactor::UNNEST {
67
            alias: _,
68
            array_expr: _,
×
69
            with_offset: _,
70
            with_offset_alias: _,
×
71
        } => Err(WindowError::UnsupportedUnnest),
×
72
        TableFactor::NestedJoin {
73
            table_with_joins: _,
74
            alias: _,
75
        } => Err(WindowError::UnsupportedNestedJoin),
×
76
    }
77
}
×
78

79
pub(crate) fn window_source_name(relation: &TableFactor) -> Result<String, WindowError> {
×
80
    match relation {
×
81
        TableFactor::Table { name, args, .. } => {
×
82
            let function_name = string_from_sql_object_name(name);
×
83

×
84
            if let Some(args) = args {
×
85
                if function_name.to_uppercase() == "TUMBLE" || function_name.to_uppercase() == "HOP"
×
86
                {
×
87
                    let source_arg = args
×
88
                        .get(0)
×
89
                        .ok_or(WindowError::WindowMissingSourceArgument)?;
×
90
                    let source_name = get_window_source_name(source_arg)?;
×
91

×
92
                    Ok(source_name)
×
93
                } else {
×
94
                    Err(WindowError::UnsupportedRelationFunction(function_name))
×
95
                }
96
            } else {
×
97
                // not a function, most probably just a relation name
×
98
                Err(WindowError::UnsupportedRelationFunction(function_name))
×
99
            }
×
100
        }
×
101
        TableFactor::Derived {
×
102
            lateral: _,
×
103
            subquery: _,
104
            alias: _,
×
105
        } => Err(WindowError::UnsupportedDerived),
×
106
        TableFactor::TableFunction { expr: _, alias: _ } => {
×
107
            Err(WindowError::UnsupportedTableFunction)
×
108
        }
109
        TableFactor::UNNEST {
110
            alias: _,
×
111
            array_expr: _,
112
            with_offset: _,
113
            with_offset_alias: _,
114
        } => Err(WindowError::UnsupportedUnnest),
×
115
        TableFactor::NestedJoin {
116
            table_with_joins: _,
117
            alias: _,
×
118
        } => Err(WindowError::UnsupportedNestedJoin),
×
119
    }
×
120
}
×
121

122
fn get_window_interval(interval_arg: &FunctionArg) -> Result<Duration, WindowError> {
×
123
    match interval_arg {
×
124
        FunctionArg::Named { name, arg: _ } => {
×
125
            let column_name = ExpressionBuilder::normalize_ident(name);
×
126
            Err(WindowError::WindowInvalidInterval(column_name))
×
127
        }
128
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
129
            FunctionArgExpr::Expr(expr) => match expr {
×
130
                Expr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
×
131
                    let interval: Duration = parse_duration_string(s)
×
132
                        .map_err(|_| WindowError::WindowInvalidInterval(s.to_owned()))?;
×
133
                    Ok(interval)
×
134
                }
×
135
                _ => Err(WindowError::WindowInvalidInterval(expr.to_string())),
×
136
            },
×
137
            FunctionArgExpr::QualifiedWildcard(_) => {
×
138
                Err(WindowError::WindowInvalidInterval("*".to_string()))
×
139
            }
140
            FunctionArgExpr::Wildcard => Err(WindowError::WindowInvalidInterval("*".to_string())),
×
141
        },
×
142
    }
×
143
}
×
144

×
145
fn get_window_hop(hop_arg: &FunctionArg) -> Result<Duration, WindowError> {
×
146
    match hop_arg {
×
147
        FunctionArg::Named { name, arg: _ } => {
×
148
            let column_name = ExpressionBuilder::normalize_ident(name);
×
149
            Err(WindowError::WindowInvalidHop(column_name))
×
150
        }
×
151
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
152
            FunctionArgExpr::Expr(expr) => match expr {
×
153
                Expr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
×
154
                    let interval: Duration = parse_duration_string(s)
×
155
                        .map_err(|_| WindowError::WindowInvalidHop(s.to_owned()))?;
×
156
                    Ok(interval)
×
157
                }
×
158
                _ => Err(WindowError::WindowInvalidHop(expr.to_string())),
×
159
            },
×
160
            FunctionArgExpr::QualifiedWildcard(_) => {
×
161
                Err(WindowError::WindowInvalidHop("*".to_string()))
×
162
            }
163
            FunctionArgExpr::Wildcard => Err(WindowError::WindowInvalidHop("*".to_string())),
×
164
        },
×
165
    }
×
166
}
×
167

×
168
fn get_window_source_name(arg: &FunctionArg) -> Result<String, WindowError> {
×
169
    match arg {
×
170
        FunctionArg::Named { name, arg: _ } => {
×
171
            let source_name = ExpressionBuilder::normalize_ident(name);
×
172
            Err(WindowError::WindowInvalidSource(source_name))
×
173
        }
×
174
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
175
            FunctionArgExpr::Expr(expr) => match expr {
×
176
                Expr::Identifier(ident) => {
×
177
                    let source_name = ExpressionBuilder::normalize_ident(ident);
×
178
                    Ok(source_name)
×
179
                }
180
                Expr::CompoundIdentifier(ident) => {
×
181
                    let source_name = ExpressionBuilder::fullname_from_ident(ident);
×
182
                    Ok(source_name)
×
183
                }
×
184
                _ => Err(WindowError::WindowInvalidColumn(expr.to_string())),
×
185
            },
186
            FunctionArgExpr::QualifiedWildcard(_) => {
×
187
                Err(WindowError::WindowInvalidColumn("*".to_string()))
×
188
            }
×
189
            FunctionArgExpr::Wildcard => Err(WindowError::WindowInvalidColumn("*".to_string())),
×
190
        },
×
191
    }
192
}
×
193

×
194
fn get_window_column_index(args: &[FunctionArg], schema: &Schema) -> Result<usize, WindowError> {
×
195
    let column_arg = args
×
196
        .get(1)
×
197
        .ok_or(WindowError::WindowMissingColumnArgument)?;
×
198
    match column_arg {
×
199
        FunctionArg::Named { name, arg: _ } => {
×
200
            let column_name = ExpressionBuilder::normalize_ident(name);
×
201
            Err(WindowError::WindowInvalidColumn(column_name))
×
202
        }
203
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
204
            FunctionArgExpr::Expr(expr) => match expr {
×
205
                Expr::Identifier(ident) => {
×
206
                    let column_name = ExpressionBuilder::normalize_ident(ident);
×
207
                    let index = get_field_index(&[ident.clone()], schema)
×
208
                        .map_err(|_| WindowError::WindowInvalidColumn(column_name.clone()))?;
×
209

×
210
                    Ok(index.ok_or(WindowError::WindowInvalidColumn(column_name))?)
×
211
                }
×
212
                Expr::CompoundIdentifier(ident) => {
×
213
                    let column_name = ExpressionBuilder::fullname_from_ident(ident);
×
214
                    let index = get_field_index(ident, schema)
×
215
                        .map_err(|_| WindowError::WindowInvalidColumn(column_name.clone()))?;
×
216

×
217
                    Ok(index.ok_or(WindowError::WindowInvalidColumn(column_name))?)
×
218
                }
×
219
                _ => Err(WindowError::WindowInvalidColumn(expr.to_string())),
×
220
            },
×
221
            FunctionArgExpr::QualifiedWildcard(_) => {
222
                Err(WindowError::WindowInvalidColumn("*".to_string()))
×
223
            }
224
            FunctionArgExpr::Wildcard => Err(WindowError::WindowInvalidColumn("*".to_string())),
×
225
        },
×
226
    }
×
227
}
×
228

229
fn parse_duration_string(duration_string: &str) -> Result<Duration, WindowError> {
×
230
    let duration_string = duration_string
×
231
        .split_whitespace()
×
232
        .collect::<Vec<_>>()
×
233
        .join(" ");
×
234

×
235
    let duration_tokens = duration_string.split(' ').collect::<Vec<_>>();
×
236
    if duration_tokens.len() != 2 {
×
237
        return Err(WindowError::WindowInvalidInterval(duration_string));
×
238
    }
×
239

×
240
    let duration_value = duration_tokens[0]
×
241
        .parse::<i64>()
×
242
        .map_err(|_| WindowError::WindowInvalidInterval(duration_string.to_owned()))?;
×
243

×
244
    let duration_unit = duration_tokens[1].to_uppercase();
×
245

×
246
    match duration_unit.as_str() {
×
247
        "MILLISECOND" | "MILLISECONDS" => Ok(Duration::milliseconds(duration_value)),
×
248
        "SECOND" | "SECONDS" => Ok(Duration::seconds(duration_value)),
×
249
        "MINUTE" | "MINUTES" => Ok(Duration::minutes(duration_value)),
×
250
        "HOUR" | "HOURS" => Ok(Duration::hours(duration_value)),
×
251
        "DAY" | "DAYS" => Ok(Duration::days(duration_value)),
×
252
        _ => Err(WindowError::WindowInvalidInterval(duration_string)),
×
253
    }
×
254
}
×
255

256
fn string_from_sql_object_name(name: &ObjectName) -> String {
391✔
257
    let function_name = name
391✔
258
        .0
391✔
259
        .iter()
391✔
260
        .map(ExpressionBuilder::normalize_ident)
391✔
261
        .collect::<Vec<String>>()
391✔
262
        .join(".");
391✔
263
    function_name
391✔
264
}
391✔
265

266
pub fn get_field_index(ident: &[Ident], schema: &Schema) -> Result<Option<usize>, PipelineError> {
×
267
    let tables_matches = |table_ident: &Ident, fd: &FieldDefinition| -> bool {
×
268
        match fd.source.clone() {
×
269
            dozer_types::types::SourceDefinition::Table {
×
270
                connection: _,
×
271
                name,
×
272
            } => name == table_ident.value,
×
273
            dozer_types::types::SourceDefinition::Alias { name } => name == table_ident.value,
×
274
            dozer_types::types::SourceDefinition::Dynamic => false,
×
275
        }
×
276
    };
×
277

278
    let field_index = match ident.len() {
×
279
        1 => {
×
280
            let field_index = schema
×
281
                .fields
×
282
                .iter()
×
283
                .enumerate()
×
284
                .find(|(_, f)| f.name == ident[0].value)
×
285
                .map(|(idx, fd)| (idx, fd.clone()));
×
286
            field_index
×
287
        }
288
        2 => {
×
289
            let table_name = ident.first().expect("table_name is expected");
×
290
            let field_name = ident.last().expect("field_name is expected");
×
291

×
292
            let index = schema
×
293
                .fields
×
294
                .iter()
×
295
                .enumerate()
×
296
                .find(|(_, f)| tables_matches(table_name, f) && f.name == field_name.value)
×
297
                .map(|(idx, fd)| (idx, fd.clone()));
×
298
            index
×
299
        }
300
        // 3 => {
301
        //     let connection_name = comp_ident.get(0).expect("connection_name is expected");
×
302
        //     let table_name = comp_ident.get(1).expect("table_name is expected");
×
303
        //     let field_name = comp_ident.get(2).expect("field_name is expected");
×
304
        // }
×
305
        _ => {
×
306
            return Err(PipelineError::JoinError(JoinError::NameSpaceTooLong(
×
307
                ident
×
308
                    .iter()
×
309
                    .map(|a| a.value.clone())
×
310
                    .collect::<Vec<String>>()
×
311
                    .join("."),
×
312
            )));
×
313
        }
314
    };
315
    field_index.map_or(Ok(None), |(i, _fd)| Ok(Some(i)))
×
316
}
×
317

318
pub(crate) fn relation_is_a_window(relation: &TableFactor) -> Result<bool, WindowError> {
413✔
319
    match relation {
413✔
320
        TableFactor::Table { name, args, .. } => {
391✔
321
            let function_name = string_from_sql_object_name(name);
391✔
322

391✔
323
            if args.is_some() {
391✔
324
                if function_name.to_uppercase() == "TUMBLE" || function_name.to_uppercase() == "HOP"
×
325
                {
326
                    Ok(true)
×
327
                } else {
×
328
                    Err(WindowError::UnsupportedRelationFunction(function_name))
×
329
                }
330
            } else {
×
331
                // not a function, most probably just a relation name
×
332
                Ok(false)
391✔
333
            }
×
334
        }
×
335
        TableFactor::Derived {
×
336
            lateral: _,
×
337
            subquery: _,
×
338
            alias: _,
×
339
        } => Ok(false),
22✔
340
        TableFactor::TableFunction { expr: _, alias: _ } => {
×
341
            Err(WindowError::UnsupportedTableFunction)
×
342
        }
×
343
        TableFactor::UNNEST {
344
            alias: _,
×
345
            array_expr: _,
346
            with_offset: _,
×
347
            with_offset_alias: _,
348
        } => Err(WindowError::UnsupportedUnnest),
×
349
        TableFactor::NestedJoin {
350
            table_with_joins: _,
×
351
            alias: _,
352
        } => Err(WindowError::UnsupportedNestedJoin),
×
353
    }
354
}
413✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc