• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6008856021

29 Aug 2023 06:44AM UTC coverage: 76.756% (-1.0%) from 77.736%
6008856021

push

github

web-flow
chore: Remove unused generic type parameter in `dozer-core` (#1929)

330 of 330 new or added lines in 38 files covered. (100.0%)

48977 of 63809 relevant lines covered (76.76%)

48470.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/table_operator/factory.rs
1
use std::{collections::HashMap, time::Duration};
2

3
use dozer_core::{
4
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
5
    processor_record::ProcessorRecordStore,
6
    DEFAULT_PORT_HANDLE,
7
};
8
use dozer_types::{errors::internal::BoxedError, types::Schema};
9
use sqlparser::ast::{Expr, FunctionArg, FunctionArgExpr, Value};
10

11
use crate::pipeline::{
12
    errors::{PipelineError, TableOperatorError},
13
    expression::{builder::ExpressionBuilder, execution::Expression},
14
    pipeline_builder::from_builder::TableOperatorDescriptor,
15
};
16

17
use super::{
18
    lifetime::LifetimeTableOperator,
19
    operator::{TableOperator, TableOperatorType},
20
    processor::TableOperatorProcessor,
21
};
22

23
const SOURCE_TABLE_ARGUMENT: usize = 0;
24

25
#[derive(Debug)]
×
26
pub struct TableOperatorProcessorFactory {
×
27
    id: String,
28
    table: TableOperatorDescriptor,
29
    name: String,
30
}
31

32
impl TableOperatorProcessorFactory {
33
    pub fn new(id: String, table: TableOperatorDescriptor) -> Self {
×
34
        Self {
×
35
            id: id.clone(),
×
36
            table,
×
37
            name: id,
×
38
        }
×
39
    }
×
40

×
41
    pub(crate) fn get_source_name(&self) -> Result<String, TableOperatorError> {
×
42
        let source_arg = self.table.args.get(SOURCE_TABLE_ARGUMENT).ok_or(
×
43
            TableOperatorError::MissingSourceArgument(self.table.name.to_owned()),
×
44
        )?;
×
45

×
46
        let source_name = get_source_name(self.table.name.to_owned(), source_arg)?;
×
47

×
48
        Ok(source_name)
×
49
    }
×
50
}
×
51

52
impl ProcessorFactory for TableOperatorProcessorFactory {
53
    fn id(&self) -> String {
×
54
        self.id.clone()
×
55
    }
×
56

×
57
    fn type_name(&self) -> String {
×
58
        self.name.clone()
×
59
    }
×
60
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
61
        vec![DEFAULT_PORT_HANDLE]
×
62
    }
×
63

×
64
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
65
        vec![OutputPortDef::new(
×
66
            DEFAULT_PORT_HANDLE,
×
67
            OutputPortType::Stateless,
×
68
        )]
×
69
    }
×
70

×
71
    fn get_output_schema(
×
72
        &self,
×
73
        _output_port: &PortHandle,
×
74
        input_schemas: &HashMap<PortHandle, Schema>,
×
75
    ) -> Result<Schema, BoxedError> {
×
76
        let input_schema = input_schemas
×
77
            .get(&DEFAULT_PORT_HANDLE)
×
78
            .ok_or(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
×
79

×
80
        let output_schema =
×
81
            match operator_from_descriptor(&self.table, input_schema)? {
×
82
                Some(operator) => operator
×
83
                    .get_output_schema(input_schema)
×
84
                    .map_err(PipelineError::TableOperatorError)?,
×
85
                None => {
×
86
                    return Err(PipelineError::TableOperatorError(
×
87
                        TableOperatorError::InternalError("Invalid Table Operator".into()),
×
88
                    )
×
89
                    .into())
×
90
                }
×
91
            };
×
92

93
        Ok(output_schema)
×
94
    }
×
95

×
96
    fn build(
×
97
        &self,
×
98
        input_schemas: HashMap<PortHandle, dozer_types::types::Schema>,
×
99
        _output_schemas: HashMap<PortHandle, dozer_types::types::Schema>,
×
100
        _record_store: &ProcessorRecordStore,
×
101
    ) -> Result<Box<dyn Processor>, BoxedError> {
×
102
        let input_schema = input_schemas
×
103
            .get(&DEFAULT_PORT_HANDLE)
×
104
            .ok_or(PipelineError::InternalError(
×
105
                "Invalid Window".to_string().into(),
×
106
            ))?
×
107
            .clone();
×
108

×
109
        match operator_from_descriptor(&self.table, &input_schema)? {
×
110
            Some(operator) => Ok(Box::new(TableOperatorProcessor::new(
×
111
                self.id.clone(),
×
112
                operator,
×
113
                input_schema,
×
114
            ))),
×
115
            None => Err(
×
116
                PipelineError::TableOperatorError(TableOperatorError::InternalError(
×
117
                    "Invalid Table Operator".into(),
×
118
                ))
×
119
                .into(),
×
120
            ),
×
121
        }
×
122
    }
×
123
}
124

×
125
pub(crate) fn operator_from_descriptor(
×
126
    descriptor: &TableOperatorDescriptor,
×
127
    schema: &Schema,
×
128
) -> Result<Option<TableOperatorType>, PipelineError> {
×
129
    if &descriptor.name.to_uppercase() == "TTL" {
×
130
        let operator = lifetime_from_descriptor(descriptor, schema)?;
×
131

×
132
        Ok(Some(operator.into()))
×
133
    } else {
134
        Err(PipelineError::InternalError(descriptor.name.clone().into()))
×
135
    }
136
}
×
137

138
fn lifetime_from_descriptor(
×
139
    descriptor: &TableOperatorDescriptor,
×
140
    schema: &Schema,
×
141
) -> Result<LifetimeTableOperator, TableOperatorError> {
×
142
    let expression_arg = descriptor
×
143
        .args
×
144
        .get(1)
×
145
        .ok_or(TableOperatorError::MissingArgument(
×
146
            descriptor.name.to_owned(),
×
147
        ))?;
×
148
    let duration_arg = descriptor
×
149
        .args
×
150
        .get(2)
×
151
        .ok_or(TableOperatorError::MissingArgument(
×
152
            descriptor.name.to_owned(),
×
153
        ))?;
×
154

×
155
    let expression = get_expression(descriptor.name.to_owned(), expression_arg, schema)?;
×
156
    let duration = get_interval(descriptor.name.to_owned(), duration_arg)?;
×
157

×
158
    let operator = LifetimeTableOperator::new(None, expression, duration);
×
159

×
160
    Ok(operator)
×
161
}
×
162

×
163
fn get_interval(
×
164
    function_name: String,
×
165
    interval_arg: &FunctionArg,
×
166
) -> Result<Duration, TableOperatorError> {
×
167
    match interval_arg {
×
168
        FunctionArg::Named { name, arg: _ } => {
×
169
            let column_name = ExpressionBuilder::normalize_ident(name);
×
170
            Err(TableOperatorError::InvalidInterval(
×
171
                column_name,
×
172
                function_name,
×
173
            ))
×
174
        }
×
175
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
176
            FunctionArgExpr::Expr(expr) => match expr {
×
177
                Expr::Value(Value::SingleQuotedString(s) | Value::DoubleQuotedString(s)) => {
×
178
                    let interval =
×
179
                        parse_duration_string(function_name.to_owned(), s).map_err(|_| {
×
180
                            TableOperatorError::InvalidInterval(s.to_owned(), function_name)
×
181
                        })?;
×
182
                    Ok(interval)
×
183
                }
×
184
                _ => Err(TableOperatorError::InvalidInterval(
×
185
                    expr.to_string(),
×
186
                    function_name,
×
187
                )),
×
188
            },
×
189
            FunctionArgExpr::QualifiedWildcard(_) => Err(TableOperatorError::InvalidInterval(
×
190
                "*".to_string(),
×
191
                function_name,
×
192
            )),
×
193
            FunctionArgExpr::Wildcard => Err(TableOperatorError::InvalidInterval(
×
194
                "*".to_string(),
×
195
                function_name,
×
196
            )),
×
197
        },
×
198
    }
×
199
}
×
200

201
fn get_expression(
×
202
    function_name: String,
×
203
    interval_arg: &FunctionArg,
×
204
    schema: &Schema,
×
205
) -> Result<Expression, TableOperatorError> {
×
206
    match interval_arg {
×
207
        FunctionArg::Named { name, arg: _ } => {
×
208
            let column_name = ExpressionBuilder::normalize_ident(name);
×
209
            Err(TableOperatorError::InvalidReference(
×
210
                column_name,
×
211
                function_name,
×
212
            ))
×
213
        }
×
214
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
215
            FunctionArgExpr::Expr(expr) => {
×
216
                let mut builder = ExpressionBuilder::new(schema.fields.len());
×
217
                let expression = builder.build(false, expr, schema).map_err(|_| {
×
218
                    TableOperatorError::InvalidReference(expr.to_string(), function_name)
×
219
                })?;
×
220

×
221
                Ok(expression)
×
222
            }
223
            FunctionArgExpr::QualifiedWildcard(_) => Err(TableOperatorError::InvalidReference(
×
224
                "*".to_string(),
×
225
                function_name,
×
226
            )),
×
227
            FunctionArgExpr::Wildcard => Err(TableOperatorError::InvalidReference(
×
228
                "*".to_string(),
×
229
                function_name,
×
230
            )),
×
231
        },
×
232
    }
×
233
}
×
234

235
fn parse_duration_string(
×
236
    function_name: String,
×
237
    duration_string: &str,
×
238
) -> Result<Duration, TableOperatorError> {
×
239
    let duration_string = duration_string
×
240
        .split_whitespace()
×
241
        .collect::<Vec<_>>()
×
242
        .join(" ");
×
243

×
244
    let duration_tokens = duration_string.split(' ').collect::<Vec<_>>();
×
245
    if duration_tokens.len() != 2 {
×
246
        return Err(TableOperatorError::InvalidInterval(
×
247
            duration_string,
×
248
            function_name,
×
249
        ));
×
250
    }
×
251

×
252
    let duration_value = duration_tokens[0].parse::<u64>().map_err(|_| {
×
253
        TableOperatorError::InvalidInterval(duration_string.to_owned(), function_name.clone())
×
254
    })?;
×
255

×
256
    let duration_unit = duration_tokens[1].to_uppercase();
×
257

×
258
    match duration_unit.as_str() {
×
259
        "MILLISECOND" | "MILLISECONDS" => Ok(Duration::from_millis(duration_value)),
×
260
        "SECOND" | "SECONDS" => Ok(Duration::from_secs(duration_value)),
×
261
        "MINUTE" | "MINUTES" => Ok(Duration::from_secs(duration_value * 60)),
×
262
        "HOUR" | "HOURS" => Ok(Duration::from_secs(duration_value * 60 * 60)),
×
263
        "DAY" | "DAYS" => Ok(Duration::from_secs(duration_value * 60 * 60 * 24)),
×
264
        _ => Err(TableOperatorError::InvalidInterval(
×
265
            duration_string,
×
266
            function_name,
×
267
        )),
×
268
    }
×
269
}
×
270

271
fn get_source_name(function_name: String, arg: &FunctionArg) -> Result<String, TableOperatorError> {
×
272
    match arg {
×
273
        FunctionArg::Named { name, arg: _ } => {
×
274
            let source_name = ExpressionBuilder::normalize_ident(name);
×
275
            Err(TableOperatorError::InvalidSourceArgument(
×
276
                source_name,
×
277
                function_name,
×
278
            ))
×
279
        }
×
280
        FunctionArg::Unnamed(arg_expr) => match arg_expr {
×
281
            FunctionArgExpr::Expr(expr) => match expr {
×
282
                Expr::Identifier(ident) => {
×
283
                    let source_name = ExpressionBuilder::normalize_ident(ident);
×
284
                    Ok(source_name)
×
285
                }
×
286
                Expr::CompoundIdentifier(ident) => {
×
287
                    let source_name = ExpressionBuilder::fullname_from_ident(ident);
×
288
                    Ok(source_name)
×
289
                }
×
290
                _ => Err(TableOperatorError::InvalidSourceArgument(
×
291
                    expr.to_string(),
×
292
                    function_name,
×
293
                )),
×
294
            },
×
295
            FunctionArgExpr::QualifiedWildcard(_) => Err(
×
296
                TableOperatorError::InvalidSourceArgument("*".to_string(), function_name),
×
297
            ),
×
298
            FunctionArgExpr::Wildcard => Err(TableOperatorError::InvalidSourceArgument(
×
299
                "*".to_string(),
×
300
                function_name,
×
301
            )),
×
302
        },
×
303
    }
×
304
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc