• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4377467257

pending completion
4377467257

push

github

GitHub
implement `HAVING` (#1198)

395 of 395 new or added lines in 6 files covered. (100.0%)

27638 of 38584 relevant lines covered (71.63%)

27777.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use crate::pipeline::aggregation::processor::AggregationProcessor;
2
use crate::pipeline::builder::SchemaSQLContext;
3
use crate::pipeline::planner::projection::CommonPlanner;
4
use crate::pipeline::projection::processor::ProjectionProcessor;
5
use dozer_core::{
6
    errors::ExecutionError,
7
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
8
    storage::lmdb_storage::LmdbExclusiveTransaction,
9
    DEFAULT_PORT_HANDLE,
10
};
11
use dozer_types::types::Schema;
12
use sqlparser::ast::Select;
13
use std::collections::HashMap;
14

15
#[derive(Debug)]
×
16
pub struct AggregationProcessorFactory {
17
    projection: Select,
18
    _stateful: bool,
19
}
20

21
impl AggregationProcessorFactory {
22
    pub fn new(projection: Select, stateful: bool) -> Self {
356✔
23
        Self {
356✔
24
            projection,
356✔
25
            _stateful: stateful,
356✔
26
        }
356✔
27
    }
356✔
28

29
    fn get_planner(&self, input_schema: Schema) -> Result<CommonPlanner, ExecutionError> {
1,014✔
30
        let mut projection_planner = CommonPlanner::new(input_schema);
1,014✔
31
        projection_planner
1,014✔
32
            .plan(self.projection.clone())
1,014✔
33
            .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
1,014✔
34
        Ok(projection_planner)
1,014✔
35
    }
1,014✔
36
}
37

38
impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
39
    fn get_input_ports(&self) -> Vec<PortHandle> {
2,038✔
40
        vec![DEFAULT_PORT_HANDLE]
2,038✔
41
    }
2,038✔
42

43
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
1,356✔
44
        vec![OutputPortDef::new(
1,356✔
45
            DEFAULT_PORT_HANDLE,
1,356✔
46
            OutputPortType::Stateless,
1,356✔
47
        )]
1,356✔
48
    }
1,356✔
49

50
    fn get_output_schema(
677✔
51
        &self,
677✔
52
        _output_port: &PortHandle,
677✔
53
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
677✔
54
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
677✔
55
        let (input_schema, ctx) = input_schemas
677✔
56
            .get(&DEFAULT_PORT_HANDLE)
677✔
57
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
677✔
58

59
        let planner = self.get_planner(input_schema.clone())?;
677✔
60
        Ok((planner.post_projection_schema, ctx.clone()))
677✔
61
    }
677✔
62

63
    fn build(
337✔
64
        &self,
337✔
65
        input_schemas: HashMap<PortHandle, Schema>,
337✔
66
        _output_schemas: HashMap<PortHandle, Schema>,
337✔
67
        _txn: &mut LmdbExclusiveTransaction,
337✔
68
    ) -> Result<Box<dyn Processor>, ExecutionError> {
337✔
69
        let input_schema = input_schemas
337✔
70
            .get(&DEFAULT_PORT_HANDLE)
337✔
71
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
337✔
72

73
        let planner = self.get_planner(input_schema.clone())?;
337✔
74

75
        let is_projection = planner.aggregation_output.is_empty() && planner.groupby.is_empty();
337✔
76
        let processor: Box<dyn Processor> = if is_projection {
337✔
77
            Box::new(ProjectionProcessor::new(
271✔
78
                input_schema.clone(),
271✔
79
                planner.projection_output,
271✔
80
            ))
271✔
81
        } else {
82
            Box::new(
83
                AggregationProcessor::new(
66✔
84
                    planner.groupby,
66✔
85
                    planner.aggregation_output,
66✔
86
                    planner.projection_output,
66✔
87
                    planner.having,
66✔
88
                    input_schema.clone(),
66✔
89
                    planner.post_aggregation_schema,
66✔
90
                )
66✔
91
                .map_err(|e| ExecutionError::InternalError(Box::new(e)))?,
66✔
92
            )
93
        };
×
94
        Ok(processor)
337✔
95
    }
337✔
96
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc