• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6009657516

29 Aug 2023 08:13AM UTC coverage: 76.652% (-1.4%) from 78.07%
6009657516

push

github

web-flow
chore: Create unit tests workflow (#1910)

* chore: Update for Rust 1.72.0

Rust 1.72.0 has introduced a bunch of new lints. Here we fix them all.

`let ... else` finally gets formatted.

* chire: Create unit tests workflow

* Rename and remove useless steps

* remove env vars

* Add concurrency group

* Test unit workflow on 4 cores

* Add mysql service to unit tests

---------

Co-authored-by: chubei <914745487@qq.com>

48982 of 63902 relevant lines covered (76.65%)

48394.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.72
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use crate::pipeline::planner::projection::CommonPlanner;
2
use crate::pipeline::projection::processor::ProjectionProcessor;
3
use crate::pipeline::{aggregation::processor::AggregationProcessor, errors::PipelineError};
4
use dozer_core::processor_record::ProcessorRecordStore;
5
use dozer_core::{
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
    DEFAULT_PORT_HANDLE,
8
};
9
use dozer_types::errors::internal::BoxedError;
10
use dozer_types::types::Schema;
11
use sqlparser::ast::Select;
12
use std::collections::HashMap;
13

14
#[derive(Debug)]
×
15
pub struct AggregationProcessorFactory {
×
16
    id: String,
17
    projection: Select,
18
    _stateful: bool,
19
    enable_probabilistic_optimizations: bool,
20
}
21

22
impl AggregationProcessorFactory {
23
    pub fn new(
691✔
24
        id: String,
691✔
25
        projection: Select,
691✔
26
        stateful: bool,
691✔
27
        enable_probabilistic_optimizations: bool,
691✔
28
    ) -> Self {
691✔
29
        Self {
691✔
30
            id,
691✔
31
            projection,
691✔
32
            _stateful: stateful,
691✔
33
            enable_probabilistic_optimizations,
691✔
34
        }
691✔
35
    }
691✔
36

×
37
    fn get_planner(&self, input_schema: Schema) -> Result<CommonPlanner, PipelineError> {
1,262✔
38
        let mut projection_planner = CommonPlanner::new(input_schema);
1,262✔
39
        projection_planner.plan(self.projection.clone())?;
1,262✔
40
        Ok(projection_planner)
1,262✔
41
    }
1,262✔
42
}
×
43

44
impl ProcessorFactory for AggregationProcessorFactory {
45
    fn type_name(&self) -> String {
28✔
46
        "Aggregation".to_string()
28✔
47
    }
28✔
48
    fn get_input_ports(&self) -> Vec<PortHandle> {
2,552✔
49
        vec![DEFAULT_PORT_HANDLE]
2,552✔
50
    }
2,552✔
51

×
52
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
1,914✔
53
        vec![OutputPortDef::new(
1,914✔
54
            DEFAULT_PORT_HANDLE,
1,914✔
55
            OutputPortType::Stateless,
1,914✔
56
        )]
1,914✔
57
    }
1,914✔
58

×
59
    fn get_output_schema(
638✔
60
        &self,
638✔
61
        _output_port: &PortHandle,
638✔
62
        input_schemas: &HashMap<PortHandle, Schema>,
638✔
63
    ) -> Result<Schema, BoxedError> {
638✔
64
        let input_schema = input_schemas
638✔
65
            .get(&DEFAULT_PORT_HANDLE)
638✔
66
            .ok_or(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
638✔
67

×
68
        let planner = self.get_planner(input_schema.clone())?;
638✔
69
        Ok(planner.post_projection_schema)
638✔
70
    }
638✔
71

×
72
    fn build(
624✔
73
        &self,
624✔
74
        input_schemas: HashMap<PortHandle, Schema>,
624✔
75
        _output_schemas: HashMap<PortHandle, Schema>,
624✔
76
        _record_store: &ProcessorRecordStore,
624✔
77
    ) -> Result<Box<dyn Processor>, BoxedError> {
624✔
78
        let input_schema = input_schemas
624✔
79
            .get(&DEFAULT_PORT_HANDLE)
624✔
80
            .ok_or(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
624✔
81

×
82
        let planner = self.get_planner(input_schema.clone())?;
624✔
83

×
84
        let is_projection = planner.aggregation_output.is_empty() && planner.groupby.is_empty();
624✔
85
        let processor: Box<dyn Processor> = if is_projection {
624✔
86
            Box::new(ProjectionProcessor::new(
413✔
87
                input_schema.clone(),
413✔
88
                planner.projection_output,
413✔
89
            ))
413✔
90
        } else {
×
91
            Box::new(AggregationProcessor::new(
211✔
92
                self.id.clone(),
211✔
93
                planner.groupby,
211✔
94
                planner.aggregation_output,
211✔
95
                planner.projection_output,
211✔
96
                planner.having,
211✔
97
                input_schema.clone(),
211✔
98
                planner.post_aggregation_schema,
211✔
99
                self.enable_probabilistic_optimizations,
211✔
100
            )?)
211✔
101
        };
×
102
        Ok(processor)
624✔
103
    }
624✔
104

×
105
    fn id(&self) -> String {
28✔
106
        self.id.clone()
28✔
107
    }
28✔
108
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc