• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6009657516

29 Aug 2023 08:13AM UTC coverage: 76.652% (-1.4%) from 78.07%
6009657516

push

github

web-flow
chore: Create unit tests workflow (#1910)

* chore: Update for Rust 1.72.0

Rust 1.72.0 has introduced a bunch of new lints. Here we fix them all.

`let ... else` finally gets formatted.

* chire: Create unit tests workflow

* Rename and remove useless steps

* remove env vars

* Add concurrency group

* Test unit workflow on 4 cores

* Add mysql service to unit tests

---------

Co-authored-by: chubei <914745487@qq.com>

48982 of 63902 relevant lines covered (76.65%)

48394.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.07
/dozer-sql/src/pipeline/projection/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
5
    processor_record::ProcessorRecordStore,
6
    DEFAULT_PORT_HANDLE,
7
};
8
use dozer_types::{
9
    errors::internal::BoxedError,
10
    types::{FieldDefinition, Schema},
11
};
12
use sqlparser::ast::{Expr, Ident, SelectItem};
13

14
use crate::pipeline::{
15
    errors::PipelineError,
16
    expression::{builder::ExpressionBuilder, execution::Expression},
17
};
18

19
use super::processor::ProjectionProcessor;
20

21
#[derive(Debug)]
×
22
pub struct ProjectionProcessorFactory {
×
23
    select: Vec<SelectItem>,
24
    id: String,
25
}
26

27
impl ProjectionProcessorFactory {
28
    /// Creates a new [`ProjectionProcessorFactory`].
29
    pub fn _new(id: String, select: Vec<SelectItem>) -> Self {
1,161✔
30
        Self { select, id }
1,161✔
31
    }
1,161✔
32
}
×
33

34
impl ProcessorFactory for ProjectionProcessorFactory {
35
    fn id(&self) -> String {
×
36
        self.id.clone()
×
37
    }
×
38
    fn type_name(&self) -> String {
×
39
        "Projection".to_string()
×
40
    }
×
41

×
42
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
43
        vec![DEFAULT_PORT_HANDLE]
×
44
    }
×
45

×
46
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
47
        vec![OutputPortDef::new(
×
48
            DEFAULT_PORT_HANDLE,
×
49
            OutputPortType::Stateless,
×
50
        )]
×
51
    }
×
52

×
53
    fn get_output_schema(
1,161✔
54
        &self,
1,161✔
55
        _output_port: &PortHandle,
1,161✔
56
        input_schemas: &HashMap<PortHandle, Schema>,
1,161✔
57
    ) -> Result<Schema, BoxedError> {
1,161✔
58
        let input_schema = input_schemas.get(&DEFAULT_PORT_HANDLE).unwrap();
1,161✔
59

1,161✔
60
        let mut select_expr: Vec<(String, Expression)> = vec![];
1,161✔
61
        for s in self.select.iter() {
1,162✔
62
            match s {
1,162✔
63
                SelectItem::Wildcard(_) => {
×
64
                    let fields: Vec<SelectItem> = input_schema
1✔
65
                        .fields
1✔
66
                        .iter()
1✔
67
                        .map(|col| {
2✔
68
                            SelectItem::UnnamedExpr(Expr::Identifier(Ident::new(
2✔
69
                                col.to_owned().name,
2✔
70
                            )))
2✔
71
                        })
2✔
72
                        .collect();
1✔
73
                    for f in fields {
3✔
74
                        if let Ok(res) = parse_sql_select_item(&f, input_schema) {
2✔
75
                            select_expr.push(res)
2✔
76
                        }
×
77
                    }
×
78
                }
79
                _ => {
80
                    if let Ok(res) = parse_sql_select_item(s, input_schema) {
1,161✔
81
                        select_expr.push(res)
1,161✔
82
                    }
×
83
                }
×
84
            }
85
        }
86

87
        let mut output_schema = input_schema.clone();
1,161✔
88
        let mut fields = vec![];
1,161✔
89
        for e in select_expr.iter() {
1,163✔
90
            let field_name = e.0.clone();
1,163✔
91
            let field_type = e.1.get_type(input_schema)?;
1,163✔
92
            fields.push(FieldDefinition::new(
1,162✔
93
                field_name,
1,162✔
94
                field_type.return_type,
1,162✔
95
                field_type.nullable,
1,162✔
96
                field_type.source,
1,162✔
97
            ));
1,162✔
98
        }
×
99
        output_schema.fields = fields;
1,160✔
100

1,160✔
101
        Ok(output_schema)
1,160✔
102
    }
1,161✔
103

×
104
    fn build(
1,158✔
105
        &self,
1,158✔
106
        input_schemas: HashMap<PortHandle, Schema>,
1,158✔
107
        _output_schemas: HashMap<PortHandle, Schema>,
1,158✔
108
        _record_store: &ProcessorRecordStore,
1,158✔
109
    ) -> Result<Box<dyn Processor>, BoxedError> {
1,158✔
110
        let schema = match input_schemas.get(&DEFAULT_PORT_HANDLE) {
1,158✔
111
            Some(schema) => Ok(schema),
1,158✔
112
            None => Err(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE)),
×
113
        }?;
×
114

×
115
        match self
1,158✔
116
            .select
1,158✔
117
            .iter()
1,158✔
118
            .map(|item| parse_sql_select_item(item, schema))
1,158✔
119
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
1,158✔
120
        {
×
121
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
1,158✔
122
                schema.clone(),
1,158✔
123
                expressions.into_iter().map(|e| e.1).collect(),
1,158✔
124
            ))),
1,158✔
125
            Err(error) => Err(error.into()),
×
126
        }
×
127
    }
1,158✔
128
}
×
129

130
pub(crate) fn parse_sql_select_item(
2,321✔
131
    sql: &SelectItem,
2,321✔
132
    schema: &Schema,
2,321✔
133
) -> Result<(String, Expression), PipelineError> {
2,321✔
134
    match sql {
2,321✔
135
        SelectItem::UnnamedExpr(sql_expr) => {
2,313✔
136
            match ExpressionBuilder::new(0).parse_sql_expression(true, sql_expr, schema) {
2,313✔
137
                Ok(expr) => Ok((sql_expr.to_string(), expr)),
2,313✔
138
                Err(error) => Err(error),
×
139
            }
×
140
        }
141
        SelectItem::ExprWithAlias { expr, alias } => {
8✔
142
            match ExpressionBuilder::new(0).parse_sql_expression(true, expr, schema) {
8✔
143
                Ok(expr) => Ok((alias.value.clone(), expr)),
8✔
144
                Err(error) => Err(error),
×
145
            }
×
146
        }
147
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidOperator("*".to_string())),
×
148
        SelectItem::QualifiedWildcard(ref object_name, ..) => {
×
149
            Err(PipelineError::InvalidOperator(object_name.to_string()))
×
150
        }
×
151
    }
152
}
2,321✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc