• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3978628498

pending completion
3978628498

Pull #705

github

GitHub
Merge 8775fcda7 into e2f9ad287
Pull Request #705: chore: support for generic schema context in `Sink`, `Processor` and `Source` factories

572 of 572 new or added lines in 35 files covered. (100.0%)

22294 of 34850 relevant lines covered (63.97%)

40332.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.0
/dozer-sql/src/pipeline/selection/factory.rs
1
use std::collections::HashMap;
2

3
use crate::pipeline::builder::SchemaSQLContext;
4
use dozer_core::dag::{
5
    dag::DEFAULT_PORT_HANDLE,
6
    errors::ExecutionError,
7
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
8
};
9
use dozer_types::types::Schema;
10
use sqlparser::ast::Expr as SqlExpr;
11

12
use crate::pipeline::expression::builder::{BuilderExpressionType, ExpressionBuilder};
13

14
use super::processor::SelectionProcessor;
15

×
16
#[derive(Debug)]
×
17
pub struct SelectionProcessorFactory {
18
    statement: SqlExpr,
19
}
20

21
impl SelectionProcessorFactory {
22
    /// Creates a new [`SelectionProcessorFactory`].
×
23
    pub fn new(statement: SqlExpr) -> Self {
44✔
24
        Self { statement }
44✔
25
    }
44✔
26
}
27

28
impl ProcessorFactory<SchemaSQLContext> for SelectionProcessorFactory {
×
29
    fn get_input_ports(&self) -> Vec<PortHandle> {
86✔
30
        vec![DEFAULT_PORT_HANDLE]
86✔
31
    }
86✔
32

×
33
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
129✔
34
        vec![OutputPortDef::new(
129✔
35
            DEFAULT_PORT_HANDLE,
129✔
36
            OutputPortType::Stateless,
129✔
37
        )]
129✔
38
    }
129✔
39

×
40
    fn get_output_schema(
43✔
41
        &self,
43✔
42
        _output_port: &PortHandle,
43✔
43
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
43✔
44
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
43✔
45
        let schema = input_schemas
43✔
46
            .get(&DEFAULT_PORT_HANDLE)
43✔
47
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
43✔
48
        Ok(schema.clone())
43✔
49
    }
43✔
50

×
51
    fn build(
43✔
52
        &self,
43✔
53
        input_schemas: HashMap<PortHandle, Schema>,
43✔
54
        _output_schemas: HashMap<PortHandle, Schema>,
43✔
55
    ) -> Result<Box<dyn Processor>, ExecutionError> {
43✔
56
        let schema = input_schemas
43✔
57
            .get(&DEFAULT_PORT_HANDLE)
43✔
58
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
43✔
59

×
60
        let builder = ExpressionBuilder {};
43✔
61

43✔
62
        match builder.build(
43✔
63
            &BuilderExpressionType::FullExpression,
43✔
64
            &self.statement,
43✔
65
            schema,
43✔
66
        ) {
43✔
67
            Ok(expression) => Ok(Box::new(SelectionProcessor::new(
43✔
68
                schema.clone(),
43✔
69
                expression,
43✔
70
            ))),
43✔
71
            Err(e) => Err(ExecutionError::InternalStringError(e.to_string())),
×
72
        }
×
73
    }
43✔
74

×
75
    fn prepare(
×
76
        &self,
×
77
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
78
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
79
    ) -> Result<(), ExecutionError> {
×
80
        Ok(())
×
81
    }
×
82
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc