• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6011532274

29 Aug 2023 11:17AM UTC coverage: 76.491% (-0.1%) from 76.616%
6011532274

push

github

web-flow
fix: Include connection type in `GenerateDot`. Fix `AggregationProcessorFactory::type_name` (#1934)

170 of 170 new or added lines in 5 files covered. (100.0%)

49016 of 64081 relevant lines covered (76.49%)

48200.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.15
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use crate::pipeline::planner::projection::CommonPlanner;
2
use crate::pipeline::projection::processor::ProjectionProcessor;
3
use crate::pipeline::{aggregation::processor::AggregationProcessor, errors::PipelineError};
4
use dozer_core::processor_record::ProcessorRecordStore;
5
use dozer_core::{
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
    DEFAULT_PORT_HANDLE,
8
};
9
use dozer_types::errors::internal::BoxedError;
10
use dozer_types::parking_lot::Mutex;
11
use dozer_types::types::Schema;
12
use sqlparser::ast::Select;
13
use std::collections::HashMap;
14

15
#[derive(Debug)]
×
16
pub struct AggregationProcessorFactory {
17
    id: String,
18
    projection: Select,
19
    _stateful: bool,
20
    enable_probabilistic_optimizations: bool,
21

22
    /// Type name can only be determined after schema propagation.
23
    type_name: Mutex<Option<String>>,
24
}
×
25

×
26
impl AggregationProcessorFactory {
×
27
    pub fn new(
691✔
28
        id: String,
691✔
29
        projection: Select,
691✔
30
        stateful: bool,
691✔
31
        enable_probabilistic_optimizations: bool,
691✔
32
    ) -> Self {
691✔
33
        Self {
691✔
34
            id,
691✔
35
            projection,
691✔
36
            _stateful: stateful,
691✔
37
            enable_probabilistic_optimizations,
691✔
38
            type_name: Mutex::new(None),
691✔
39
        }
691✔
40
    }
691✔
41

×
42
    fn get_planner(&self, input_schema: Schema) -> Result<CommonPlanner, PipelineError> {
1,262✔
43
        let mut projection_planner = CommonPlanner::new(input_schema);
1,262✔
44
        projection_planner.plan(self.projection.clone())?;
1,262✔
45
        Ok(projection_planner)
1,262✔
46
    }
1,262✔
47
}
×
48

×
49
impl ProcessorFactory for AggregationProcessorFactory {
×
50
    fn type_name(&self) -> String {
42✔
51
        self.type_name
42✔
52
            .lock()
42✔
53
            .as_deref()
42✔
54
            .unwrap_or("Aggregation")
42✔
55
            .to_string()
42✔
56
    }
42✔
57
    fn get_input_ports(&self) -> Vec<PortHandle> {
2,552✔
58
        vec![DEFAULT_PORT_HANDLE]
2,552✔
59
    }
2,552✔
60

×
61
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
1,914✔
62
        vec![OutputPortDef::new(
1,914✔
63
            DEFAULT_PORT_HANDLE,
1,914✔
64
            OutputPortType::Stateless,
1,914✔
65
        )]
1,914✔
66
    }
1,914✔
67

×
68
    fn get_output_schema(
638✔
69
        &self,
638✔
70
        _output_port: &PortHandle,
638✔
71
        input_schemas: &HashMap<PortHandle, Schema>,
638✔
72
    ) -> Result<Schema, BoxedError> {
638✔
73
        let input_schema = input_schemas
638✔
74
            .get(&DEFAULT_PORT_HANDLE)
638✔
75
            .ok_or(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
638✔
76

×
77
        let planner = self.get_planner(input_schema.clone())?;
638✔
78

×
79
        *self.type_name.lock() = Some(
638✔
80
            if is_projection(&planner) {
638✔
81
                "Projection"
427✔
82
            } else {
83
                "Aggregation"
211✔
84
            }
85
            .to_string(),
638✔
86
        );
638✔
87

638✔
88
        Ok(planner.post_projection_schema)
638✔
89
    }
638✔
90

×
91
    fn build(
624✔
92
        &self,
624✔
93
        input_schemas: HashMap<PortHandle, Schema>,
624✔
94
        _output_schemas: HashMap<PortHandle, Schema>,
624✔
95
        _record_store: &ProcessorRecordStore,
624✔
96
    ) -> Result<Box<dyn Processor>, BoxedError> {
624✔
97
        let input_schema = input_schemas
624✔
98
            .get(&DEFAULT_PORT_HANDLE)
624✔
99
            .ok_or(PipelineError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
624✔
100

×
101
        let planner = self.get_planner(input_schema.clone())?;
624✔
102

103
        let processor: Box<dyn Processor> = if is_projection(&planner) {
624✔
104
            Box::new(ProjectionProcessor::new(
413✔
105
                input_schema.clone(),
413✔
106
                planner.projection_output,
413✔
107
            ))
413✔
108
        } else {
×
109
            Box::new(AggregationProcessor::new(
211✔
110
                self.id.clone(),
211✔
111
                planner.groupby,
211✔
112
                planner.aggregation_output,
211✔
113
                planner.projection_output,
211✔
114
                planner.having,
211✔
115
                input_schema.clone(),
211✔
116
                planner.post_aggregation_schema,
211✔
117
                self.enable_probabilistic_optimizations,
211✔
118
            )?)
211✔
119
        };
120
        Ok(processor)
624✔
121
    }
624✔
122

123
    fn id(&self) -> String {
28✔
124
        self.id.clone()
28✔
125
    }
28✔
126
}
127

128
fn is_projection(planner: &CommonPlanner) -> bool {
1,262✔
129
    planner.aggregation_output.is_empty() && planner.groupby.is_empty()
1,262✔
130
}
1,262✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc