• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4371276901

pending completion
4371276901

push

github

GitHub
fix: Add sources names used in the query (#1190)

28 of 28 new or added lines in 3 files covered. (100.0%)

27837 of 39159 relevant lines covered (71.09%)

73963.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/window/processor.rs
1
use crate::pipeline::errors::WindowError;
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::storage::lmdb_storage::SharedTransaction;
7
use dozer_core::DEFAULT_PORT_HANDLE;
8
use dozer_types::types::{Operation, Record};
9

10
use super::operator::WindowType;
11

12
#[derive(Debug)]
×
13
pub struct WindowProcessor {
14
    window: WindowType,
15
}
16

17
impl WindowProcessor {
18
    pub fn new(window: WindowType) -> Self {
×
19
        Self { window }
×
20
    }
×
21

22
    fn execute(&self, record: &Record) -> Result<Vec<Record>, WindowError> {
×
23
        self.window.execute(record)
×
24
    }
×
25
}
26

27
impl Processor for WindowProcessor {
28
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
29
        Ok(())
×
30
    }
×
31

32
    fn process(
×
33
        &mut self,
×
34
        _from_port: PortHandle,
×
35
        op: Operation,
×
36
        fw: &mut dyn ProcessorChannelForwarder,
×
37
        _tx: &SharedTransaction,
×
38
    ) -> Result<(), ExecutionError> {
×
39
        match op {
×
40
            Operation::Delete { ref old } => {
×
41
                let records = self
×
42
                    .execute(old)
×
43
                    .map_err(|e| ExecutionError::WindowProcessorError(Box::new(e)))?;
×
44
                for record in records {
×
45
                    fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
×
46
                }
47
            }
48
            Operation::Insert { ref new } => {
×
49
                let records = self
×
50
                    .execute(new)
×
51
                    .map_err(|e| ExecutionError::WindowProcessorError(Box::new(e)))?;
×
52
                for record in records {
×
53
                    fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
×
54
                }
55
            }
56
            Operation::Update { ref old, ref new } => {
×
57
                let old_records = self
×
58
                    .execute(old)
×
59
                    .map_err(|e| ExecutionError::WindowProcessorError(Box::new(e)))?;
×
60
                for record in old_records {
×
61
                    fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
×
62
                }
63

64
                let new_records = self
×
65
                    .execute(new)
×
66
                    .map_err(|e| ExecutionError::WindowProcessorError(Box::new(e)))?;
×
67
                for record in new_records {
×
68
                    fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
×
69
                }
70
            }
71
        }
72
        Ok(())
×
73
    }
×
74
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc