• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5819169315

pending completion
5819169315

push

github

web-flow
Fix JavaScript capitalization (#1840)

45504 of 61139 relevant lines covered (74.43%)

58086.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.95
/dozer-sql/src/pipeline/selection/processor.rs
1
use crate::pipeline::expression::execution::Expression;
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::executor_operation::ProcessorOperation;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::processor_record::ProcessorRecordStore;
7
use dozer_core::DEFAULT_PORT_HANDLE;
8
use dozer_types::errors::internal::BoxedError;
9
use dozer_types::types::{Field, Schema};
10

11
#[derive(Debug)]
×
12
pub struct SelectionProcessor {
13
    expression: Expression,
14
    input_schema: Schema,
15
}
16

17
impl SelectionProcessor {
18
    pub fn new(input_schema: Schema, expression: Expression) -> Self {
101✔
19
        Self {
101✔
20
            input_schema,
101✔
21
            expression,
101✔
22
        }
101✔
23
    }
101✔
24
}
25

×
26
impl Processor for SelectionProcessor {
×
27
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
101✔
28
        Ok(())
101✔
29
    }
101✔
30

31
    fn process(
11,580✔
32
        &mut self,
11,580✔
33
        _from_port: PortHandle,
11,580✔
34
        record_store: &ProcessorRecordStore,
11,580✔
35
        op: ProcessorOperation,
11,580✔
36
        fw: &mut dyn ProcessorChannelForwarder,
11,580✔
37
    ) -> Result<(), BoxedError> {
11,580✔
38
        match op {
11,580✔
39
            ProcessorOperation::Delete { ref old } => {
200✔
40
                let old = record_store.load_record(old)?;
200✔
41
                if self.expression.evaluate(&old, &self.input_schema)? == Field::Boolean(true) {
200✔
42
                    fw.send(op, DEFAULT_PORT_HANDLE);
140✔
43
                }
140✔
44
            }
×
45
            ProcessorOperation::Insert { ref new } => {
11,320✔
46
                let new = record_store.load_record(new)?;
11,320✔
47
                if self.expression.evaluate(&new, &self.input_schema)? == Field::Boolean(true) {
11,320✔
48
                    fw.send(op, DEFAULT_PORT_HANDLE);
10,468✔
49
                }
10,852✔
50
            }
×
51
            ProcessorOperation::Update { old, new } => {
60✔
52
                let old_decoded = record_store.load_record(&old)?;
60✔
53
                let old_fulfilled = self.expression.evaluate(&old_decoded, &self.input_schema)?
60✔
54
                    == Field::Boolean(true);
60✔
55
                let new_decoded = record_store.load_record(&new)?;
60✔
56
                let new_fulfilled = self.expression.evaluate(&new_decoded, &self.input_schema)?
60✔
57
                    == Field::Boolean(true);
60✔
58
                match (old_fulfilled, new_fulfilled) {
60✔
59
                    (true, true) => {
60✔
60
                        // both records fulfills the WHERE condition, forward the operation
60✔
61
                        fw.send(ProcessorOperation::Update { old, new }, DEFAULT_PORT_HANDLE);
60✔
62
                    }
60✔
63
                    (true, false) => {
×
64
                        // the old record fulfills the WHERE condition while then new one doesn't, forward a delete operation
×
65
                        fw.send(ProcessorOperation::Delete { old }, DEFAULT_PORT_HANDLE);
×
66
                    }
×
67
                    (false, true) => {
×
68
                        // the old record doesn't fulfill the WHERE condition while then new one does, forward an insert operation
×
69
                        fw.send(ProcessorOperation::Insert { new }, DEFAULT_PORT_HANDLE);
×
70
                    }
×
71
                    (false, false) => {
×
72
                        // both records doesn't fulfill the WHERE condition, don't forward the operation
×
73
                    }
×
74
                }
×
75
            }
×
76
        }
×
77
        Ok(())
11,580✔
78
    }
11,580✔
79
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc