• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5809849307

pending completion
5809849307

push

github

web-flow
refactor: Remove nested record ref. Add `ProcessorRecordStore` (#1835)

* temp: Remove `ProcessRecordRef` and let `ProcessorRecord` directly reference `Vec<Field>`

* feat: Abstract away `ProcessorRecordStore`

* fix: Fix tests and clippy

794 of 794 new or added lines in 78 files covered. (100.0%)

45486 of 61064 relevant lines covered (74.49%)

58165.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/window/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::executor_operation::ProcessorOperation;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::processor_record::ProcessorRecordStore;
7
use dozer_core::DEFAULT_PORT_HANDLE;
8
use dozer_types::errors::internal::BoxedError;
9

10
use super::operator::WindowType;
11

12
#[derive(Debug)]
×
13
pub struct WindowProcessor {
14
    _id: String,
15
    window: WindowType,
16
}
17

18
impl WindowProcessor {
19
    pub fn new(id: String, window: WindowType) -> Self {
×
20
        Self { _id: id, window }
×
21
    }
×
22
}
23

×
24
impl Processor for WindowProcessor {
×
25
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
×
26
        Ok(())
×
27
    }
×
28

29
    fn process(
×
30
        &mut self,
×
31
        _from_port: PortHandle,
×
32
        record_store: &ProcessorRecordStore,
×
33
        op: ProcessorOperation,
×
34
        fw: &mut dyn ProcessorChannelForwarder,
×
35
    ) -> Result<(), BoxedError> {
×
36
        match op {
×
37
            ProcessorOperation::Delete { old } => {
×
38
                let old_decoded = record_store.load_record(&old)?;
×
39
                let records = self
×
40
                    .window
×
41
                    .execute(record_store, old, old_decoded)
×
42
                    .map_err(PipelineError::WindowError)?;
×
43
                for record in records {
×
44
                    fw.send(
×
45
                        ProcessorOperation::Delete { old: record },
×
46
                        DEFAULT_PORT_HANDLE,
×
47
                    );
×
48
                }
×
49
            }
×
50
            ProcessorOperation::Insert { new } => {
×
51
                let new_decoded = record_store.load_record(&new)?;
×
52
                let records = self
×
53
                    .window
×
54
                    .execute(record_store, new, new_decoded)
×
55
                    .map_err(PipelineError::WindowError)?;
×
56
                for record in records {
×
57
                    fw.send(
×
58
                        ProcessorOperation::Insert { new: record },
×
59
                        DEFAULT_PORT_HANDLE,
×
60
                    );
×
61
                }
×
62
            }
×
63
            ProcessorOperation::Update { old, new } => {
×
64
                self.process(
×
65
                    DEFAULT_PORT_HANDLE,
×
66
                    record_store,
×
67
                    ProcessorOperation::Delete { old },
×
68
                    fw,
×
69
                )?;
×
70

×
71
                self.process(
×
72
                    DEFAULT_PORT_HANDLE,
×
73
                    record_store,
×
74
                    ProcessorOperation::Insert { new },
×
75
                    fw,
×
76
                )?;
×
77
            }
×
78
        }
79
        Ok(())
×
80
    }
×
81
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc