• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4124646176

pending completion
4124646176

Pull #811

github

GitHub
Merge c6bc261de into f4fe30c14
Pull Request #811: chore: integrating sql planner

737 of 737 new or added lines in 23 files covered. (100.0%)

23321 of 35114 relevant lines covered (66.42%)

35990.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.57
/dozer-sql/src/pipeline/selection/factory.rs
1
use std::collections::HashMap;
2

3
use crate::pipeline::builder::SchemaSQLContext;
4
use dozer_core::{
5
    errors::ExecutionError,
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
    DEFAULT_PORT_HANDLE,
8
};
9
use dozer_types::types::Schema;
10
use sqlparser::ast::Expr as SqlExpr;
11

12
use crate::pipeline::expression::builder::{ExpressionBuilder, ExpressionContext};
13

14
use super::processor::SelectionProcessor;
15

16
#[derive(Debug)]
×
17
pub struct SelectionProcessorFactory {
18
    statement: SqlExpr,
19
}
20

21
impl SelectionProcessorFactory {
22
    /// Creates a new [`SelectionProcessorFactory`].
23
    pub fn new(statement: SqlExpr) -> Self {
30✔
24
        Self { statement }
30✔
25
    }
30✔
26
}
27

28
impl ProcessorFactory<SchemaSQLContext> for SelectionProcessorFactory {
29
    fn get_input_ports(&self) -> Vec<PortHandle> {
86✔
30
        vec![DEFAULT_PORT_HANDLE]
86✔
31
    }
86✔
32

33
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
115✔
34
        vec![OutputPortDef::new(
115✔
35
            DEFAULT_PORT_HANDLE,
115✔
36
            OutputPortType::Stateless,
115✔
37
        )]
115✔
38
    }
115✔
39

40
    fn get_output_schema(
57✔
41
        &self,
57✔
42
        _output_port: &PortHandle,
57✔
43
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
57✔
44
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
57✔
45
        let schema = input_schemas
57✔
46
            .get(&DEFAULT_PORT_HANDLE)
57✔
47
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
57✔
48
        Ok(schema.clone())
57✔
49
    }
57✔
50

51
    fn build(
29✔
52
        &self,
29✔
53
        input_schemas: HashMap<PortHandle, Schema>,
29✔
54
        _output_schemas: HashMap<PortHandle, Schema>,
29✔
55
    ) -> Result<Box<dyn Processor>, ExecutionError> {
29✔
56
        let schema = input_schemas
29✔
57
            .get(&DEFAULT_PORT_HANDLE)
29✔
58
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
29✔
59

60
        match ExpressionBuilder::build(
29✔
61
            &mut ExpressionContext::new(schema.fields.len()),
29✔
62
            false,
29✔
63
            &self.statement,
29✔
64
            schema,
29✔
65
        ) {
29✔
66
            Ok(expression) => Ok(Box::new(SelectionProcessor::new(
29✔
67
                schema.clone(),
29✔
68
                expression,
29✔
69
            ))),
29✔
70
            Err(e) => Err(ExecutionError::InternalStringError(e.to_string())),
×
71
        }
×
72
    }
29✔
73

×
74
    fn prepare(
28✔
75
        &self,
28✔
76
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
28✔
77
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
28✔
78
    ) -> Result<(), ExecutionError> {
28✔
79
        Ok(())
28✔
80
    }
28✔
81
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc