• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.89
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use crate::pipeline::aggregation::processor::AggregationProcessor;
2
use crate::pipeline::builder::SchemaSQLContext;
3
use crate::pipeline::planner::projection::CommonPlanner;
4
use crate::pipeline::projection::processor::ProjectionProcessor;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory,
8
};
9
use dozer_core::dag::DEFAULT_PORT_HANDLE;
10
use dozer_types::types::Schema;
11
use sqlparser::ast::Select;
12
use std::collections::HashMap;
13

14
#[derive(Debug)]
×
15
pub struct AggregationProcessorFactory {
16
    projection: Select,
17
    stateful: bool,
18
}
19

20
impl AggregationProcessorFactory {
21
    pub fn new(projection: Select, stateful: bool) -> Self {
117✔
22
        Self {
117✔
23
            projection,
117✔
24
            stateful,
117✔
25
        }
117✔
26
    }
117✔
27

×
28
    fn get_planner(&self, input_schema: Schema) -> Result<CommonPlanner, ExecutionError> {
308✔
29
        let mut projection_planner = CommonPlanner::new(input_schema);
308✔
30
        projection_planner
308✔
31
            .plan(self.projection.clone())
308✔
32
            .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
308✔
33
        Ok(projection_planner)
308✔
34
    }
308✔
35
}
36

×
37
impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
×
38
    fn get_input_ports(&self) -> Vec<PortHandle> {
308✔
39
        vec![DEFAULT_PORT_HANDLE]
308✔
40
    }
308✔
41

×
42
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
411✔
43
        if self.stateful {
411✔
44
            vec![OutputPortDef::new(
×
45
                DEFAULT_PORT_HANDLE,
×
46
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
47
                    retr_old_records_for_deletes: true,
×
48
                    retr_old_records_for_updates: true,
×
49
                },
×
50
            )]
×
51
        } else {
×
52
            vec![OutputPortDef::new(
411✔
53
                DEFAULT_PORT_HANDLE,
411✔
54
                OutputPortType::Stateless,
411✔
55
            )]
411✔
56
        }
×
57
    }
411✔
58

×
59
    fn get_output_schema(
205✔
60
        &self,
205✔
61
        _output_port: &PortHandle,
205✔
62
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
205✔
63
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
205✔
64
        let (input_schema, ctx) = input_schemas
205✔
65
            .get(&DEFAULT_PORT_HANDLE)
205✔
66
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
205✔
67

×
68
        let planner = self.get_planner(input_schema.clone())?;
205✔
69
        Ok((planner.post_projection_schema, ctx.clone()))
205✔
70
    }
205✔
71

×
72
    fn build(
103✔
73
        &self,
103✔
74
        input_schemas: HashMap<PortHandle, Schema>,
103✔
75
        _output_schemas: HashMap<PortHandle, Schema>,
103✔
76
    ) -> Result<Box<dyn Processor>, ExecutionError> {
103✔
77
        let input_schema = input_schemas
103✔
78
            .get(&DEFAULT_PORT_HANDLE)
103✔
79
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
103✔
80

×
81
        let planner = self.get_planner(input_schema.clone())?;
103✔
82

×
83
        let processor: Box<dyn Processor> = match planner.aggregation_output.len() {
103✔
84
            0 => Box::new(ProjectionProcessor::new(
75✔
85
                input_schema.clone(),
75✔
86
                planner.projection_output,
75✔
87
            )),
75✔
88
            _ => Box::new(
×
89
                AggregationProcessor::new(
28✔
90
                    planner.groupby,
28✔
91
                    planner.aggregation_output,
28✔
92
                    planner.projection_output,
28✔
93
                    input_schema.clone(),
28✔
94
                    planner.post_aggregation_schema.clone(),
28✔
95
                )
28✔
96
                .map_err(|e| ExecutionError::InternalError(Box::new(e)))?,
28✔
97
            ),
×
98
        };
×
99

×
100
        Ok(processor)
103✔
101
    }
103✔
102

×
103
    fn prepare(
102✔
104
        &self,
102✔
105
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
102✔
106
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
102✔
107
    ) -> Result<(), ExecutionError> {
102✔
108
        Ok(())
102✔
109
    }
102✔
110
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc