• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4384029966

pending completion
4384029966

push

github

GitHub
Prepare v0.1.11 (#1203)

28488 of 40876 relevant lines covered (69.69%)

39271.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.55
/dozer-sql/src/pipeline/projection/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    errors::ExecutionError,
5
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
6
    storage::lmdb_storage::LmdbExclusiveTransaction,
7
    DEFAULT_PORT_HANDLE,
8
};
9
use dozer_types::types::{FieldDefinition, Schema};
10
use sqlparser::ast::{Expr, Ident, SelectItem};
11

12
use crate::pipeline::builder::SchemaSQLContext;
13
use crate::pipeline::{
14
    errors::PipelineError,
15
    expression::{
16
        builder::ExpressionBuilder, execution::Expression, execution::ExpressionExecutor,
17
    },
18
};
19

20
use super::processor::ProjectionProcessor;
21

22
#[derive(Debug)]
×
23
pub struct ProjectionProcessorFactory {
24
    select: Vec<SelectItem>,
25
}
26

27
impl ProjectionProcessorFactory {
28
    /// Creates a new [`ProjectionProcessorFactory`].
29
    pub fn _new(select: Vec<SelectItem>) -> Self {
91✔
30
        Self { select }
91✔
31
    }
91✔
32
}
33

34
impl ProcessorFactory<SchemaSQLContext> for ProjectionProcessorFactory {
35
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
36
        vec![DEFAULT_PORT_HANDLE]
×
37
    }
×
38

39
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
40
        vec![OutputPortDef::new(
×
41
            DEFAULT_PORT_HANDLE,
×
42
            OutputPortType::Stateless,
×
43
        )]
×
44
    }
×
45

46
    fn get_output_schema(
91✔
47
        &self,
91✔
48
        _output_port: &PortHandle,
91✔
49
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
91✔
50
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
91✔
51
        let (input_schema, context) = input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap();
91✔
52

91✔
53
        let mut select_expr: Vec<(String, Expression)> = vec![];
91✔
54
        for s in self.select.iter() {
93✔
55
            match s {
93✔
56
                SelectItem::Wildcard(_) => {
57
                    let fields: Vec<SelectItem> = input_schema
1✔
58
                        .fields
1✔
59
                        .iter()
1✔
60
                        .map(|col| {
2✔
61
                            SelectItem::UnnamedExpr(Expr::Identifier(Ident::new(
2✔
62
                                col.to_owned().name,
2✔
63
                            )))
2✔
64
                        })
2✔
65
                        .collect();
1✔
66
                    for f in fields {
3✔
67
                        let res = parse_sql_select_item(&f, input_schema);
2✔
68
                        if let Ok(..) = res {
2✔
69
                            select_expr.push(res.unwrap())
2✔
70
                        }
×
71
                    }
72
                }
73
                _ => {
74
                    let res = parse_sql_select_item(s, input_schema);
92✔
75
                    if let Ok(..) = res {
92✔
76
                        select_expr.push(res.unwrap())
92✔
77
                    }
×
78
                }
79
            }
80
        }
81

82
        let mut output_schema = input_schema.clone();
92✔
83
        let mut fields = vec![];
92✔
84
        for e in select_expr.iter() {
94✔
85
            let field_name = e.0.clone();
94✔
86
            let field_type =
93✔
87
                e.1.get_type(input_schema)
94✔
88
                    .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
94✔
89
            fields.push(FieldDefinition::new(
93✔
90
                field_name,
93✔
91
                field_type.return_type,
93✔
92
                field_type.nullable,
93✔
93
                field_type.source,
93✔
94
            ));
93✔
95
        }
96
        output_schema.fields = fields;
91✔
97

91✔
98
        Ok((output_schema, context.clone()))
91✔
99
    }
92✔
100

101
    fn build(
89✔
102
        &self,
89✔
103
        input_schemas: HashMap<PortHandle, Schema>,
89✔
104
        _output_schemas: HashMap<PortHandle, Schema>,
89✔
105
        _txn: &mut LmdbExclusiveTransaction,
89✔
106
    ) -> Result<Box<dyn Processor>, ExecutionError> {
89✔
107
        let schema = match input_schemas.get(&DEFAULT_PORT_HANDLE) {
89✔
108
            Some(schema) => Ok(schema),
89✔
109
            None => Err(ExecutionError::InternalStringError(
×
110
                "Invalid Projection input port".to_string(),
×
111
            )),
×
112
        }?;
×
113

114
        match self
89✔
115
            .select
89✔
116
            .iter()
89✔
117
            .map(|item| parse_sql_select_item(item, schema))
89✔
118
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
89✔
119
        {
120
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
89✔
121
                schema.clone(),
89✔
122
                expressions.into_iter().map(|e| e.1).collect(),
89✔
123
            ))),
89✔
124
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
125
        }
126
    }
89✔
127
}
128

129
pub(crate) fn parse_sql_select_item(
182✔
130
    sql: &SelectItem,
182✔
131
    schema: &Schema,
182✔
132
) -> Result<(String, Expression), PipelineError> {
182✔
133
    match sql {
182✔
134
        SelectItem::UnnamedExpr(sql_expr) => {
180✔
135
            match ExpressionBuilder::new(0).parse_sql_expression(true, sql_expr, schema) {
180✔
136
                Ok(expr) => Ok((sql_expr.to_string(), expr)),
180✔
137
                Err(error) => Err(error),
×
138
            }
139
        }
140
        SelectItem::ExprWithAlias { expr, alias } => {
2✔
141
            match ExpressionBuilder::new(0).parse_sql_expression(true, expr, schema) {
2✔
142
                Ok(expr) => Ok((alias.value.clone(), expr)),
2✔
143
                Err(error) => Err(error),
×
144
            }
145
        }
146
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidOperator("*".to_string())),
×
147
        SelectItem::QualifiedWildcard(ref object_name, ..) => {
×
148
            Err(PipelineError::InvalidOperator(object_name.to_string()))
×
149
        }
150
    }
151
}
182✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc