• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6009657516

29 Aug 2023 08:13AM UTC coverage: 76.652% (-1.4%) from 78.07%
6009657516

push

github

web-flow
chore: Create unit tests workflow (#1910)

* chore: Update for Rust 1.72.0

Rust 1.72.0 has introduced a bunch of new lints. Here we fix them all.

`let ... else` finally gets formatted.

* chire: Create unit tests workflow

* Rename and remove useless steps

* remove env vars

* Add concurrency group

* Test unit workflow on 4 cores

* Add mysql service to unit tests

---------

Co-authored-by: chubei <914745487@qq.com>

48982 of 63902 relevant lines covered (76.65%)

48394.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/window/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
5
    processor_record::ProcessorRecordStore,
6
    DEFAULT_PORT_HANDLE,
7
};
8
use dozer_types::{errors::internal::BoxedError, types::Schema};
9

10
use crate::pipeline::{
11
    errors::{PipelineError, WindowError},
12
    pipeline_builder::from_builder::TableOperatorDescriptor,
13
};
14

15
use super::{
16
    builder::{window_from_table_operator, window_source_name},
17
    processor::WindowProcessor,
18
};
19

20
#[derive(Debug)]
×
21
pub struct WindowProcessorFactory {
×
22
    id: String,
23
    table: TableOperatorDescriptor,
24
}
25

26
impl WindowProcessorFactory {
27
    pub fn new(id: String, table: TableOperatorDescriptor) -> Self {
×
28
        Self { id, table }
×
29
    }
×
30

×
31
    pub(crate) fn get_source_name(&self) -> Result<String, PipelineError> {
×
32
        window_source_name(&self.table).map_err(PipelineError::WindowError)
×
33
    }
×
34
}
×
35

36
impl ProcessorFactory for WindowProcessorFactory {
37
    fn id(&self) -> String {
×
38
        self.id.clone()
×
39
    }
×
40

×
41
    fn type_name(&self) -> String {
×
42
        "Window".to_string()
×
43
    }
×
44

×
45
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
46
        vec![DEFAULT_PORT_HANDLE]
×
47
    }
×
48

×
49
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
50
        vec![OutputPortDef::new(
×
51
            DEFAULT_PORT_HANDLE,
×
52
            OutputPortType::Stateless,
×
53
        )]
×
54
    }
×
55

×
56
    fn get_output_schema(
×
57
        &self,
×
58
        _output_port: &PortHandle,
×
59
        input_schemas: &HashMap<PortHandle, Schema>,
×
60
    ) -> Result<Schema, BoxedError> {
×
61
        let input_schema = input_schemas
×
62
            .get(&DEFAULT_PORT_HANDLE)
×
63
            .ok_or(PipelineError::InternalError(
×
64
                "Invalid Window".to_string().into(),
×
65
            ))?
×
66
            .clone();
×
67

×
68
        let output_schema = match window_from_table_operator(&self.table, &input_schema)
×
69
            .map_err(PipelineError::WindowError)?
×
70
        {
×
71
            Some(window) => window
×
72
                .get_output_schema(&input_schema)
×
73
                .map_err(PipelineError::WindowError)?,
×
74
            None => return Err(PipelineError::WindowError(WindowError::InvalidWindow()).into()),
×
75
        };
×
76

77
        Ok(output_schema)
×
78
    }
×
79

×
80
    fn build(
×
81
        &self,
×
82
        input_schemas: HashMap<PortHandle, dozer_types::types::Schema>,
×
83
        _output_schemas: HashMap<PortHandle, dozer_types::types::Schema>,
×
84
        _record_store: &ProcessorRecordStore,
×
85
    ) -> Result<Box<dyn Processor>, BoxedError> {
×
86
        let input_schema = input_schemas
×
87
            .get(&DEFAULT_PORT_HANDLE)
×
88
            .ok_or(PipelineError::InternalError(
×
89
                "Invalid Window".to_string().into(),
×
90
            ))?
×
91
            .clone();
×
92

×
93
        match window_from_table_operator(&self.table, &input_schema)
×
94
            .map_err(PipelineError::WindowError)?
×
95
        {
×
96
            Some(window) => Ok(Box::new(WindowProcessor::new(self.id.clone(), window))),
×
97
            None => Err(PipelineError::WindowError(WindowError::InvalidWindow()).into()),
×
98
        }
×
99
    }
×
100
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc