• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4012757265

pending completion
4012757265

Pull #737

github

GitHub
Merge 41e5235a4 into c7b362bed
Pull Request #737: feat: select * wildcard

55 of 55 new or added lines in 4 files covered. (100.0%)

23308 of 35040 relevant lines covered (66.52%)

37782.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.74
/dozer-sql/src/pipeline/projection/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::{
4
    dag::DEFAULT_PORT_HANDLE,
5
    errors::ExecutionError,
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
};
8
use dozer_types::types::{FieldDefinition, Schema};
9
use sqlparser::ast::{FunctionArg, FunctionArgExpr, SelectItem};
10

11
use crate::pipeline::builder::SchemaSQLContext;
12
use crate::pipeline::{
13
    errors::PipelineError,
14
    expression::{
15
        builder::{BuilderExpressionType, ExpressionBuilder},
16
        execution::Expression,
17
        execution::ExpressionExecutor,
18
    },
19
};
20

21
use super::processor::ProjectionProcessor;
22

23
#[derive(Debug)]
×
24
pub struct ProjectionProcessorFactory {
25
    select: Vec<SelectItem>,
26
}
27

28
impl ProjectionProcessorFactory {
29
    /// Creates a new [`ProjectionProcessorFactory`].
30
    pub fn _new(select: Vec<SelectItem>) -> Self {
51✔
31
        Self { select }
51✔
32
    }
51✔
33
}
34

35
impl ProcessorFactory<SchemaSQLContext> for ProjectionProcessorFactory {
36
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
37
        vec![DEFAULT_PORT_HANDLE]
×
38
    }
×
39

40
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
41
        vec![OutputPortDef::new(
×
42
            DEFAULT_PORT_HANDLE,
×
43
            OutputPortType::Stateless,
×
44
        )]
×
45
    }
×
46

47
    fn get_output_schema(
51✔
48
        &self,
51✔
49
        _output_port: &PortHandle,
51✔
50
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
51✔
51
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
51✔
52
        let (input_schema, context) = input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap();
51✔
53
        match self
51✔
54
            .select
51✔
55
            .iter()
51✔
56
            .map(|item| parse_sql_select_item(item, input_schema))
52✔
57
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
51✔
58
        {
59
            Ok(expressions) => {
51✔
60
                let mut output_schema = input_schema.clone();
51✔
61
                let mut fields = vec![];
51✔
62
                for e in expressions.iter() {
52✔
63
                    let field_name = e.0.clone();
52✔
64
                    if field_name.eq(&FunctionArgExpr::Wildcard.to_string()) {
52✔
65
                        for field in input_schema.fields.clone() {
×
66
                            output_schema.fields.push(FieldDefinition::new(
×
67
                                field.name,
×
68
                                field.typ,
×
69
                                field.nullable,
×
70
                                field.source,
×
71
                            ));
×
72
                        }
×
73
                        break;
×
74
                    }
52✔
75
                    let field_type =
51✔
76
                        e.1.get_type(input_schema)
52✔
77
                            .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
52✔
78
                    fields.push(FieldDefinition::new(
51✔
79
                        field_name,
51✔
80
                        field_type.return_type,
51✔
81
                        field_type.nullable,
51✔
82
                        field_type.source,
51✔
83
                    ));
51✔
84
                }
×
85
                output_schema.fields = fields;
50✔
86

50✔
87
                Ok((output_schema, context.clone()))
50✔
88
            }
×
89
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
90
        }
×
91
    }
51✔
92

×
93
    fn build(
49✔
94
        &self,
49✔
95
        input_schemas: HashMap<PortHandle, Schema>,
49✔
96
        _output_schemas: HashMap<PortHandle, Schema>,
49✔
97
    ) -> Result<Box<dyn Processor>, ExecutionError> {
49✔
98
        let schema = match input_schemas.get(&DEFAULT_PORT_HANDLE) {
49✔
99
            Some(schema) => Ok(schema),
49✔
100
            None => Err(ExecutionError::InternalStringError(
×
101
                "Invalid Projection input port".to_string(),
×
102
            )),
×
103
        }?;
×
104

×
105
        match self
49✔
106
            .select
49✔
107
            .iter()
49✔
108
            .map(|item| parse_sql_select_item(item, schema))
49✔
109
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
49✔
110
        {
×
111
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
49✔
112
                schema.clone(),
49✔
113
                expressions,
49✔
114
            ))),
49✔
115
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
116
        }
×
117
    }
49✔
118

×
119
    fn prepare(
×
120
        &self,
×
121
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
122
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
123
    ) -> Result<(), ExecutionError> {
×
124
        Ok(())
×
125
    }
×
126
}
×
127

×
128
pub(crate) fn parse_sql_select_item(
1,028✔
129
    sql: &SelectItem,
1,028✔
130
    schema: &Schema,
1,028✔
131
) -> Result<(String, Expression), PipelineError> {
1,028✔
132
    let builder = ExpressionBuilder {};
1,028✔
133
    match sql {
1,028✔
134
        SelectItem::UnnamedExpr(sql_expr) => {
990✔
135
            match builder.parse_sql_expression(
990✔
136
                &BuilderExpressionType::FullExpression,
990✔
137
                sql_expr,
990✔
138
                schema,
990✔
139
            ) {
990✔
140
                Ok(expr) => Ok((sql_expr.to_string(), *expr.0)),
990✔
141
                Err(error) => Err(error),
×
142
            }
143
        }
×
144
        SelectItem::ExprWithAlias { expr, alias } => {
38✔
145
            match builder.parse_sql_expression(&BuilderExpressionType::FullExpression, expr, schema)
38✔
146
            {
147
                Ok(expr) => Ok((alias.value.clone(), *expr.0)),
38✔
148
                Err(error) => Err(error),
×
149
            }
150
        }
151
        // TODO: (chloe) implement wildcard
152
        SelectItem::Wildcard(_) => {
153
            match builder.parse_sql_function_arg(
×
154
                &BuilderExpressionType::FullExpression,
×
155
                &FunctionArg::Unnamed(FunctionArgExpr::Wildcard),
×
156
                schema
×
157
            ) {
×
158
                Ok(expr) => Ok(("*".to_string(), *expr.0)),
×
159
                Err(error) => Err(error),
×
160
            }
161
        }
162
        SelectItem::QualifiedWildcard(ref object_name, ..) => {
×
163
            Err(PipelineError::InvalidOperator(object_name.to_string()))
×
164
        }
165
    }
166
}
1,028✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc