• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/window/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::dozer_log::storage::Object;
4
use dozer_core::epoch::Epoch;
5
use dozer_core::executor_operation::ProcessorOperation;
6
use dozer_core::node::{PortHandle, Processor};
7
use dozer_core::processor_record::ProcessorRecordStore;
8
use dozer_core::DEFAULT_PORT_HANDLE;
9
use dozer_types::errors::internal::BoxedError;
10

11
use super::operator::WindowType;
12

13
#[derive(Debug)]
×
14
pub struct WindowProcessor {
15
    _id: String,
16
    window: WindowType,
17
}
18

19
impl WindowProcessor {
20
    pub fn new(id: String, window: WindowType, _checkpoint_data: Option<Vec<u8>>) -> Self {
×
21
        Self { _id: id, window }
×
22
    }
×
23
}
24

25
impl Processor for WindowProcessor {
26
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
×
27
        Ok(())
×
28
    }
×
29

30
    fn process(
×
31
        &mut self,
×
32
        _from_port: PortHandle,
×
33
        record_store: &ProcessorRecordStore,
×
34
        op: ProcessorOperation,
×
35
        fw: &mut dyn ProcessorChannelForwarder,
×
36
    ) -> Result<(), BoxedError> {
×
37
        match op {
×
38
            ProcessorOperation::Delete { old } => {
×
39
                let old_decoded = record_store.load_record(&old)?;
×
40
                let records = self
×
41
                    .window
×
42
                    .execute(record_store, old, old_decoded)
×
43
                    .map_err(PipelineError::WindowError)?;
×
44
                for record in records {
×
45
                    fw.send(
×
46
                        ProcessorOperation::Delete { old: record },
×
47
                        DEFAULT_PORT_HANDLE,
×
48
                    );
×
49
                }
×
50
            }
51
            ProcessorOperation::Insert { new } => {
×
52
                let new_decoded = record_store.load_record(&new)?;
×
53
                let records = self
×
54
                    .window
×
55
                    .execute(record_store, new, new_decoded)
×
56
                    .map_err(PipelineError::WindowError)?;
×
57
                for record in records {
×
58
                    fw.send(
×
59
                        ProcessorOperation::Insert { new: record },
×
60
                        DEFAULT_PORT_HANDLE,
×
61
                    );
×
62
                }
×
63
            }
64
            ProcessorOperation::Update { old, new } => {
×
65
                self.process(
×
66
                    DEFAULT_PORT_HANDLE,
×
67
                    record_store,
×
68
                    ProcessorOperation::Delete { old },
×
69
                    fw,
×
70
                )?;
×
71

72
                self.process(
×
73
                    DEFAULT_PORT_HANDLE,
×
74
                    record_store,
×
75
                    ProcessorOperation::Insert { new },
×
76
                    fw,
×
77
                )?;
×
78
            }
79
        }
80
        Ok(())
×
81
    }
×
82

83
    fn serialize(
×
84
        &mut self,
×
85
        _record_store: &ProcessorRecordStore,
×
86
        _object: Object,
×
87
    ) -> Result<(), BoxedError> {
×
88
        Ok(())
×
89
    }
×
90
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc