• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5640896332

pending completion
5640896332

push

github

web-flow
chore: Remove `Schema::identifier` (#1776)

1574 of 1574 new or added lines in 102 files covered. (100.0%)

42145 of 54025 relevant lines covered (78.01%)

17469.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.27
/dozer-sql/src/pipeline/projection/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
3

4
use dozer_core::channels::ProcessorChannelForwarder;
5
use dozer_core::epoch::Epoch;
6
use dozer_core::node::{PortHandle, Processor};
7
use dozer_core::DEFAULT_PORT_HANDLE;
8
use dozer_types::errors::internal::BoxedError;
9
use dozer_types::types::{Operation, Record, Schema};
10

11
#[derive(Debug)]
×
12
pub struct ProjectionProcessor {
13
    expressions: Vec<Expression>,
14
    input_schema: Schema,
15
}
16

17
impl ProjectionProcessor {
18
    pub fn new(input_schema: Schema, expressions: Vec<Expression>) -> Self {
1,392✔
19
        Self {
1,392✔
20
            input_schema,
1,392✔
21
            expressions,
1,392✔
22
        }
1,392✔
23
    }
1,392✔
24

25
    fn delete(&mut self, record: &Record) -> Result<Operation, PipelineError> {
200✔
26
        let mut results = vec![];
200✔
27

28
        for expr in &self.expressions {
588✔
29
            results.push(expr.evaluate(record, &self.input_schema)?);
388✔
30
        }
31

32
        let mut output_record = Record::new(results);
200✔
33
        output_record.set_lifetime(record.lifetime.to_owned());
200✔
34

200✔
35
        Ok(Operation::Delete { old: output_record })
200✔
36
    }
200✔
37

38
    fn insert(&mut self, record: &Record) -> Result<Operation, PipelineError> {
2,885✔
39
        let mut results = vec![];
2,885✔
40

41
        for expr in self.expressions.clone() {
4,917✔
42
            results.push(expr.evaluate(record, &self.input_schema)?);
4,916✔
43
        }
44

45
        let mut output_record = Record::new(results);
2,886✔
46
        output_record.set_lifetime(record.lifetime.to_owned());
2,886✔
47
        Ok(Operation::Insert { new: output_record })
2,886✔
48
    }
2,886✔
49

50
    fn update(&self, old: &Record, new: &Record) -> Result<Operation, PipelineError> {
×
51
        let mut old_results = vec![];
×
52
        let mut new_results = vec![];
×
53

54
        for expr in &self.expressions {
×
55
            old_results.push(expr.evaluate(old, &self.input_schema)?);
×
56
            new_results.push(expr.evaluate(new, &self.input_schema)?);
×
57
        }
58

59
        let mut old_output_record = Record::new(old_results);
×
60
        old_output_record.set_lifetime(old.lifetime.to_owned());
×
61
        let mut new_output_record = Record::new(new_results);
×
62
        new_output_record.set_lifetime(new.lifetime.to_owned());
×
63
        Ok(Operation::Update {
×
64
            old: old_output_record,
×
65
            new: new_output_record,
×
66
        })
×
67
    }
×
68
}
69

70
impl Processor for ProjectionProcessor {
71
    fn process(
3,086✔
72
        &mut self,
3,086✔
73
        _from_port: PortHandle,
3,086✔
74
        op: Operation,
3,086✔
75
        fw: &mut dyn ProcessorChannelForwarder,
3,086✔
76
    ) -> Result<(), BoxedError> {
3,086✔
77
        match op {
3,086✔
78
            Operation::Delete { ref old } => fw.send(self.delete(old)?, DEFAULT_PORT_HANDLE),
200✔
79
            Operation::Insert { ref new } => fw.send(self.insert(new)?, DEFAULT_PORT_HANDLE),
2,886✔
80
            Operation::Update { ref old, ref new } => {
×
81
                fw.send(self.update(old, new)?, DEFAULT_PORT_HANDLE)
×
82
            }
83
        };
84
        Ok(())
3,083✔
85
    }
3,083✔
86

87
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
244✔
88
        Ok(())
244✔
89
    }
244✔
90
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc