• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/table_operator/factory.rs
1
use std::{collections::HashMap, time::Duration};
2

3
use dozer_core::{
4
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
5
    processor_record::ProcessorRecordStore,
6
    DEFAULT_PORT_HANDLE,
7
};
8
use dozer_types::models::udf_config::UdfConfig;
9
use dozer_types::{errors::internal::BoxedError, types::Schema};
10
use sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Value};
11

12
use crate::pipeline::{
13
    errors::{PipelineError, TableOperatorError},
14
    expression::{builder::ExpressionBuilder, execution::Expression},
15
    pipeline_builder::from_builder::TableOperatorDescriptor,
16
};
17

18
use super::{
19
    lifetime::LifetimeTableOperator,
20
    operator::{TableOperator, TableOperatorType},
21
    processor::TableOperatorProcessor,
22
};
23

24
const SOURCE_TABLE_ARGUMENT: usize = 0;
25

26
#[derive(Debug)]
×
27
pub struct TableOperatorProcessorFactory {
28
    id: String,
29
    table: TableOperatorDescriptor,
30
    name: String,
31
    udfs: Vec<UdfConfig>,
32
}
33

34
impl TableOperatorProcessorFactory {
35
    pub fn new(id: String, table: TableOperatorDescriptor, udfs: Vec<UdfConfig>) -> Self {
×
36
        Self {
×
37
            id: id.clone(),
×
38
            table,
×
39
            name: id,
×
40
            udfs,
×
41
        }
×
42
    }
×
43

44
    pub(crate) fn get_source_name(&self) -> Result<String, TableOperatorError> {
×
45
        let source_arg = self.table.args.get(SOURCE_TABLE_ARGUMENT).ok_or(
×
46
            TableOperatorError::MissingSourceArgument(self.table.name.to_owned()),
×
47
        )?;
×
48

49
        let source_name = get_source_name(self.table.name.to_owned(), source_arg)?;
×
50

51
        Ok(source_name)
×
52
    }
×
53
}
54

55
impl ProcessorFactory for TableOperatorProcessorFactory {
56
    fn id(&self) -> String {
×
57
        self.id.clone()
×
58
    }
×
59

60
    fn type_name(&self) -> String {
×
61
        self.name.clone()
×
62
    }
×
63
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
64
        vec![DEFAULT_PORT_HANDLE]
×
65
    }
×
66

67
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
68
        vec![OutputPortDef::new(
×
69
            DEFAULT_PORT_HANDLE,
×
70
            OutputPortType::Stateless,
×
71
        )]
×
72
    }
×
73

74
    fn get_output_schema(
×
75
        &self,
×
76
        _output_port: &PortHandle,
×
77
        input_schemas: &HashMap<PortHandle, Schema>,
×
78
    ) -> Result<Schema, BoxedError> {
×
79
        let input_schema = input_schemas
×
80
            .get(&DEFAULT_PORT_HANDLE)
×
81
            .ok_or(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
×
82

83
        let output_schema =
×
84
            match operator_from_descriptor(&self.table, input_schema, &self.udfs)? {
×
85
                Some(operator) => operator
×
86
                    .get_output_schema(input_schema)
×
87
                    .map_err(PipelineError::TableOperatorError)?,
×
88
                None => {
89
                    return Err(PipelineError::TableOperatorError(
×
90
                        TableOperatorError::InternalError("Invalid Table Operator".into()),
×
91
                    )
×
92
                    .into())
×
93
                }
94
            };
95

96
        Ok(output_schema)
×
97
    }
×
98

99
    fn build(
×
100
        &self,
×
101
        input_schemas: HashMap<PortHandle, dozer_types::types::Schema>,
×
102
        _output_schemas: HashMap<PortHandle, dozer_types::types::Schema>,
×
103
        _record_store: &ProcessorRecordStore,
×
104
        checkpoint_data: Option<Vec<u8>>,
×
105
    ) -> Result<Box<dyn Processor>, BoxedError> {
×
106
        let input_schema = input_schemas
×
107
            .get(&DEFAULT_PORT_HANDLE)
×
108
            .ok_or(PipelineError::InternalError(
×
109
                "Invalid Window".to_string().into(),
×
110
            ))?
×
111
            .clone();
×
112

×
113
        match operator_from_descriptor(&self.table, &input_schema, &self.udfs)? {
×
114
            Some(operator) => Ok(Box::new(TableOperatorProcessor::new(
×
115
                self.id.clone(),
×
116
                operator,
×
117
                input_schema,
×
118
                checkpoint_data,
×
119
            ))),
×
120
            None => Err(
×
121
                PipelineError::TableOperatorError(TableOperatorError::InternalError(
×
122
                    "Invalid Table Operator".into(),
×
123
                ))
×
124
                .into(),
×
125
            ),
×
126
        }
127
    }
×
128
}
129

130
pub(crate) fn operator_from_descriptor(
×
131
    descriptor: &TableOperatorDescriptor,
×
132
    schema: &Schema,
×
133
    udfs: &[UdfConfig],
×
134
) -> Result<Option<TableOperatorType>, PipelineError> {
×
135
    if &descriptor.name.to_uppercase() == "TTL" {
×
136
        let operator = lifetime_from_descriptor(descriptor, schema, udfs)?;
×
137

138
        Ok(Some(operator.into()))
×
139
    } else {
140
        Err(PipelineError::InternalError(descriptor.name.clone().into()))
×
141
    }
142
}
×
143

144
fn lifetime_from_descriptor(
×
145
    descriptor: &TableOperatorDescriptor,
×
146
    schema: &Schema,
×
147
    udfs: &[UdfConfig],
×
148
) -> Result<LifetimeTableOperator, TableOperatorError> {
×
149
    let expression_arg = descriptor
×
150
        .args
×
151
        .get(1)
×
152
        .ok_or(TableOperatorError::MissingArgument(
×
153
            descriptor.name.to_owned(),
×
154
        ))?;
×
155
    let duration_arg = descriptor
×
156
        .args
×
157
        .get(2)
×
158
        .ok_or(TableOperatorError::MissingArgument(
×
159
            descriptor.name.to_owned(),
×
160
        ))?;
×
161

162
    let expression = get_expression(descriptor.name.to_owned(), expression_arg, schema, udfs)?;
×
163
    let duration = get_interval(descriptor.name.to_owned(), duration_arg)?;
×
164

165
    let operator = LifetimeTableOperator::new(None, expression, duration);
×
166

×
167
    Ok(operator)
×
168
}
×
169

170
fn get_interval(
×
171
    function_name: String,
×
172
    interval_arg: &FunctionArg,
×
173
) -> Result<Duration, TableOperatorError> {
×
174
    match interval_arg {
×
175
        FunctionArg::Named { name, arg: _ } => {
×
176
            let column_name = ExpressionBuilder::normalize_ident(name);
×
177
            Err(TableOperatorError::InvalidInterval(
×
178
                column_name,
×
179
                function_name,
×
180
            ))
×
181
        }
182
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
183
            FunctionArgExpr::Expr(expr) => match expr {
×
184
                Expr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
×
185
                    let interval =
×
186
                        parse_duration_string(function_name.to_owned(), s).map_err(|_| {
×
187
                            TableOperatorError::InvalidInterval(s.to_owned(), function_name)
×
188
                        })?;
×
189
                    Ok(interval)
×
190
                }
191
                _ => Err(TableOperatorError::InvalidInterval(
×
192
                    expr.to_string(),
×
193
                    function_name,
×
194
                )),
×
195
            },
196
            FunctionArgExpr::QualifiedWildcard(_) => Err(TableOperatorError::InvalidInterval(
×
197
                "*".to_string(),
×
198
                function_name,
×
199
            )),
×
200
            FunctionArgExpr::Wildcard => Err(TableOperatorError::InvalidInterval(
×
201
                "*".to_string(),
×
202
                function_name,
×
203
            )),
×
204
        },
205
    }
206
}
×
207

208
fn get_expression(
×
209
    function_name: String,
×
210
    interval_arg: &FunctionArg,
×
211
    schema: &Schema,
×
212
    udfs: &[UdfConfig],
×
213
) -> Result<Expression, TableOperatorError> {
×
214
    match interval_arg {
×
215
        FunctionArg::Named { name, arg: _ } => {
×
216
            let column_name = ExpressionBuilder::normalize_ident(name);
×
217
            Err(TableOperatorError::InvalidReference(
×
218
                column_name,
×
219
                function_name,
×
220
            ))
×
221
        }
222
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
223
            FunctionArgExpr::Expr(expr) => {
×
224
                let mut builder = ExpressionBuilder::new(schema.fields.len());
×
225
                let expression = builder.build(false, expr, schema, udfs).map_err(|_| {
×
226
                    TableOperatorError::InvalidReference(expr.to_string(), function_name)
×
227
                })?;
×
228

229
                Ok(expression)
×
230
            }
231
            FunctionArgExpr::QualifiedWildcard(_) => Err(TableOperatorError::InvalidReference(
×
232
                "*".to_string(),
×
233
                function_name,
×
234
            )),
×
235
            FunctionArgExpr::Wildcard => Err(TableOperatorError::InvalidReference(
×
236
                "*".to_string(),
×
237
                function_name,
×
238
            )),
×
239
        },
240
    }
241
}
×
242

243
fn parse_duration_string(
×
244
    function_name: String,
×
245
    duration_string: &str,
×
246
) -> Result<Duration, TableOperatorError> {
×
247
    let duration_string = duration_string
×
248
        .split_whitespace()
×
249
        .collect::<Vec<_>>()
×
250
        .join(" ");
×
251

×
252
    let duration_tokens = duration_string.split(' ').collect::<Vec<_>>();
×
253
    if duration_tokens.len() != 2 {
×
254
        return Err(TableOperatorError::InvalidInterval(
×
255
            duration_string,
×
256
            function_name,
×
257
        ));
×
258
    }
×
259

260
    let duration_value = duration_tokens[0].parse::<u64>().map_err(|_| {
×
261
        TableOperatorError::InvalidInterval(duration_string.to_owned(), function_name.clone())
×
262
    })?;
×
263

264
    let duration_unit = duration_tokens[1].to_uppercase();
×
265

×
266
    match duration_unit.as_str() {
×
267
        "MILLISECOND" | "MILLISECONDS" => Ok(Duration::from_millis(duration_value)),
×
268
        "SECOND" | "SECONDS" => Ok(Duration::from_secs(duration_value)),
×
269
        "MINUTE" | "MINUTES" => Ok(Duration::from_secs(duration_value * 60)),
×
270
        "HOUR" | "HOURS" => Ok(Duration::from_secs(duration_value * 60 * 60)),
×
271
        "DAY" | "DAYS" => Ok(Duration::from_secs(duration_value * 60 * 60 * 24)),
×
272
        _ => Err(TableOperatorError::InvalidInterval(
×
273
            duration_string,
×
274
            function_name,
×
275
        )),
×
276
    }
277
}
×
278

279
fn get_source_name(function_name: String, arg: &FunctionArg) -> Result<String, TableOperatorError> {
×
280
    match arg {
×
281
        FunctionArg::Named { name, arg: _ } => {
×
282
            let source_name = ExpressionBuilder::normalize_ident(name);
×
283
            Err(TableOperatorError::InvalidSourceArgument(
×
284
                source_name,
×
285
                function_name,
×
286
            ))
×
287
        }
288
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
289
            FunctionArgExpr::Expr(expr) => match expr {
×
290
                Expr::Identifier(ident) => {
×
291
                    let source_name = ExpressionBuilder::normalize_ident(ident);
×
292
                    Ok(source_name)
×
293
                }
294
                Expr::CompoundIdentifier(ident) => {
×
295
                    let source_name = ExpressionBuilder::fullname_from_ident(ident);
×
296
                    Ok(source_name)
×
297
                }
298
                _ => Err(TableOperatorError::InvalidSourceArgument(
×
299
                    expr.to_string(),
×
300
                    function_name,
×
301
                )),
×
302
            },
303
            FunctionArgExpr::QualifiedWildcard(_) => Err(
×
304
                TableOperatorError::InvalidSourceArgument("*".to_string(), function_name),
×
305
            ),
×
306
            FunctionArgExpr::Wildcard => Err(TableOperatorError::InvalidSourceArgument(
×
307
                "*".to_string(),
×
308
                function_name,
×
309
            )),
×
310
        },
311
    }
312
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc