• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4103544570

pending completion
4103544570

Pull #811

github

GitHub
Merge 39dd780d2 into 4e98c5561
Pull Request #811: chore: integrating sql planner

700 of 700 new or added lines in 16 files covered. (100.0%)

23318 of 38144 relevant lines covered (61.13%)

33761.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.89
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use crate::pipeline::aggregation::processor::AggregationProcessor;
2
use crate::pipeline::builder::SchemaSQLContext;
3
use crate::pipeline::planner::projection::CommonPlanner;
4
use crate::pipeline::projection::processor::ProjectionProcessor;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory,
8
};
9
use dozer_core::dag::DEFAULT_PORT_HANDLE;
10
use dozer_types::types::Schema;
11
use sqlparser::ast::Select;
12
use std::collections::HashMap;
13

14
#[derive(Debug)]
×
15
pub struct AggregationProcessorFactory {
16
    projection: Select,
17
    stateful: bool,
18
}
19

20
impl AggregationProcessorFactory {
21
    pub fn new(projection: Select, stateful: bool) -> Self {
83✔
22
        Self {
83✔
23
            projection,
83✔
24
            stateful,
83✔
25
        }
83✔
26
    }
83✔
27

×
28
    fn get_planner(&self, input_schema: Schema) -> Result<CommonPlanner, ExecutionError> {
206✔
29
        let mut projection_planner = CommonPlanner::new(input_schema);
206✔
30
        projection_planner
206✔
31
            .plan(self.projection.clone())
206✔
32
            .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
206✔
33
        Ok(projection_planner)
206✔
34
    }
206✔
35
}
36

×
37
impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
×
38
    fn get_input_ports(&self) -> Vec<PortHandle> {
206✔
39
        vec![DEFAULT_PORT_HANDLE]
206✔
40
    }
206✔
41

×
42
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
275✔
43
        if self.stateful {
275✔
44
            vec![OutputPortDef::new(
×
45
                DEFAULT_PORT_HANDLE,
×
46
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
47
                    retr_old_records_for_deletes: true,
×
48
                    retr_old_records_for_updates: true,
×
49
                },
×
50
            )]
×
51
        } else {
×
52
            vec![OutputPortDef::new(
275✔
53
                DEFAULT_PORT_HANDLE,
275✔
54
                OutputPortType::Stateless,
275✔
55
            )]
275✔
56
        }
×
57
    }
275✔
58

×
59
    fn get_output_schema(
137✔
60
        &self,
137✔
61
        _output_port: &PortHandle,
137✔
62
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
137✔
63
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
137✔
64
        let (input_schema, ctx) = input_schemas
137✔
65
            .get(&DEFAULT_PORT_HANDLE)
137✔
66
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
137✔
67

×
68
        let planner = self.get_planner(input_schema.clone())?;
137✔
69
        Ok((planner.post_projection_schema, ctx.clone()))
137✔
70
    }
137✔
71

×
72
    fn build(
69✔
73
        &self,
69✔
74
        input_schemas: HashMap<PortHandle, Schema>,
69✔
75
        _output_schemas: HashMap<PortHandle, Schema>,
69✔
76
    ) -> Result<Box<dyn Processor>, ExecutionError> {
69✔
77
        let input_schema = input_schemas
69✔
78
            .get(&DEFAULT_PORT_HANDLE)
69✔
79
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
69✔
80

×
81
        let planner = self.get_planner(input_schema.clone())?;
69✔
82

×
83
        let processor: Box<dyn Processor> = match planner.aggregation_output.len() {
69✔
84
            0 => Box::new(ProjectionProcessor::new(
50✔
85
                input_schema.clone(),
50✔
86
                planner.projection_output,
50✔
87
            )),
50✔
88
            _ => Box::new(
×
89
                AggregationProcessor::new(
19✔
90
                    planner.groupby,
19✔
91
                    planner.aggregation_output,
19✔
92
                    planner.projection_output,
19✔
93
                    input_schema.clone(),
19✔
94
                    planner.post_aggregation_schema,
19✔
95
                )
19✔
96
                .map_err(|e| ExecutionError::InternalError(Box::new(e)))?,
19✔
97
            ),
×
98
        };
×
99

×
100
        Ok(processor)
69✔
101
    }
69✔
102

×
103
    fn prepare(
68✔
104
        &self,
68✔
105
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
68✔
106
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
68✔
107
    ) -> Result<(), ExecutionError> {
68✔
108
        Ok(())
68✔
109
    }
68✔
110
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc