• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5651881554

pending completion
5651881554

push

github

web-flow
feat: refactor records (#1781)

* feat: add processor record

* chore: refactor processor record

* chore: introduce processor operation

* chore: refactor processors

* chore: refactor log operations

* fix import errors

* chore: fix index

* chore: fix recursive fetch

* fix: Don't allow cloning `ProcessorRecord`

* chore: fix errors

* fix: Don't expose `ProcessorRecordRef` implementation detail

* fix: Add `ProcessorRecord::total_len` to correctly handle `extend_` functions

* fix: Fix tests compilation by @chloeminkyung

* fix: Fix window and lifetime sql unit tests

* fix: clippy fix

* chore: fmt fix

* chore: Move `ProcessorRecord` to `dozer-core`

---------

Co-authored-by: Chloe Kim <chloeminkyung@gmail.com>
Co-authored-by: chubei <914745487@qq.com>

849 of 849 new or added lines in 64 files covered. (100.0%)

43094 of 55381 relevant lines covered (77.81%)

29359.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.13
/dozer-sql/src/pipeline/projection/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
3

4
use dozer_core::channels::ProcessorChannelForwarder;
5
use dozer_core::epoch::Epoch;
6
use dozer_core::executor_operation::ProcessorOperation;
7
use dozer_core::node::{PortHandle, Processor};
8
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordRef};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::errors::internal::BoxedError;
11
use dozer_types::types::Schema;
12

13
#[derive(Debug)]
×
14
pub struct ProjectionProcessor {
15
    expressions: Vec<Expression>,
16
    input_schema: Schema,
17
}
18

19
impl ProjectionProcessor {
20
    pub fn new(input_schema: Schema, expressions: Vec<Expression>) -> Self {
1,394✔
21
        Self {
1,394✔
22
            input_schema,
1,394✔
23
            expressions,
1,394✔
24
        }
1,394✔
25
    }
1,394✔
26

27
    fn delete(&mut self, record: &ProcessorRecordRef) -> Result<ProcessorOperation, PipelineError> {
200✔
28
        let mut output_record = ProcessorRecord::new();
200✔
29
        for expr in &self.expressions {
588✔
30
            output_record
31
                .extend_direct_field(expr.evaluate(record.get_record(), &self.input_schema)?);
388✔
32
        }
33

34
        output_record.set_lifetime(record.get_record().lifetime.to_owned());
200✔
35

200✔
36
        Ok(ProcessorOperation::Delete {
200✔
37
            old: ProcessorRecordRef::new(output_record),
200✔
38
        })
200✔
39
    }
200✔
40

41
    fn insert(&mut self, record: &ProcessorRecordRef) -> Result<ProcessorOperation, PipelineError> {
2,877✔
42
        let mut output_record = ProcessorRecord::new();
2,877✔
43

44
        for expr in self.expressions.clone() {
4,918✔
45
            output_record
46
                .extend_direct_field(expr.evaluate(record.get_record(), &self.input_schema)?);
4,918✔
47
        }
48

49
        output_record.set_lifetime(record.get_record().lifetime.to_owned());
2,886✔
50
        Ok(ProcessorOperation::Insert {
2,886✔
51
            new: ProcessorRecordRef::new(output_record),
2,886✔
52
        })
2,886✔
53
    }
2,886✔
54

55
    fn update(
×
56
        &self,
×
57
        old: &ProcessorRecordRef,
×
58
        new: &ProcessorRecordRef,
×
59
    ) -> Result<ProcessorOperation, PipelineError> {
×
60
        let mut old_output_record = ProcessorRecord::new();
×
61
        let mut new_output_record = ProcessorRecord::new();
×
62
        for expr in &self.expressions {
×
63
            old_output_record
64
                .extend_direct_field(expr.evaluate(old.get_record(), &self.input_schema)?);
×
65
            new_output_record
66
                .extend_direct_field(expr.evaluate(new.get_record(), &self.input_schema)?);
×
67
        }
68

69
        old_output_record.set_lifetime(old.get_record().lifetime.to_owned());
×
70

×
71
        new_output_record.set_lifetime(new.get_record().lifetime.to_owned());
×
72
        Ok(ProcessorOperation::Update {
×
73
            old: ProcessorRecordRef::new(old_output_record),
×
74
            new: ProcessorRecordRef::new(new_output_record),
×
75
        })
×
76
    }
×
77
}
78

79
impl Processor for ProjectionProcessor {
80
    fn process(
3,086✔
81
        &mut self,
3,086✔
82
        _from_port: PortHandle,
3,086✔
83
        op: ProcessorOperation,
3,086✔
84
        fw: &mut dyn ProcessorChannelForwarder,
3,086✔
85
    ) -> Result<(), BoxedError> {
3,086✔
86
        match op {
3,086✔
87
            ProcessorOperation::Delete { ref old } => {
200✔
88
                fw.send(self.delete(old)?, DEFAULT_PORT_HANDLE)
200✔
89
            }
90
            ProcessorOperation::Insert { ref new } => {
2,886✔
91
                fw.send(self.insert(new)?, DEFAULT_PORT_HANDLE)
2,886✔
92
            }
93
            ProcessorOperation::Update { ref old, ref new } => {
×
94
                fw.send(self.update(old, new)?, DEFAULT_PORT_HANDLE)
×
95
            }
96
        };
97
        Ok(())
3,085✔
98
    }
3,085✔
99

100
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
244✔
101
        Ok(())
244✔
102
    }
244✔
103
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc