• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.31
/dozer-sql/src/pipeline/selection/processor.rs
1
use crate::pipeline::expression::execution::Expression;
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::dozer_log::storage::Object;
4
use dozer_core::epoch::Epoch;
5
use dozer_core::executor_operation::ProcessorOperation;
6
use dozer_core::node::{PortHandle, Processor};
7
use dozer_core::processor_record::ProcessorRecordStore;
8
use dozer_core::DEFAULT_PORT_HANDLE;
9
use dozer_types::errors::internal::BoxedError;
10
use dozer_types::types::{Field, Schema};
11

12
#[derive(Debug)]
×
13
pub struct SelectionProcessor {
14
    expression: Expression,
15
    input_schema: Schema,
16
}
17

18
impl SelectionProcessor {
19
    pub fn new(
176✔
20
        input_schema: Schema,
176✔
21
        expression: Expression,
176✔
22
        _checkpoint_data: Option<Vec<u8>>,
176✔
23
    ) -> Self {
176✔
24
        Self {
176✔
25
            input_schema,
176✔
26
            expression,
176✔
27
        }
176✔
28
    }
176✔
29
}
30

31
impl Processor for SelectionProcessor {
32
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
177✔
33
        Ok(())
177✔
34
    }
177✔
35

36
    fn process(
12,765✔
37
        &mut self,
12,765✔
38
        _from_port: PortHandle,
12,765✔
39
        record_store: &ProcessorRecordStore,
12,765✔
40
        op: ProcessorOperation,
12,765✔
41
        fw: &mut dyn ProcessorChannelForwarder,
12,765✔
42
    ) -> Result<(), BoxedError> {
12,765✔
43
        match op {
12,765✔
44
            ProcessorOperation::Delete { ref old } => {
350✔
45
                let old = record_store.load_record(old)?;
350✔
46
                if self.expression.evaluate(&old, &self.input_schema)? == Field::Boolean(true) {
350✔
47
                    fw.send(op, DEFAULT_PORT_HANDLE);
245✔
48
                }
245✔
49
            }
50
            ProcessorOperation::Insert { ref new } => {
12,310✔
51
                let new = record_store.load_record(new)?;
12,310✔
52
                if self.expression.evaluate(&new, &self.input_schema)? == Field::Boolean(true) {
12,310✔
53
                    fw.send(op, DEFAULT_PORT_HANDLE);
10,819✔
54
                }
11,491✔
55
            }
56
            ProcessorOperation::Update { old, new } => {
105✔
57
                let old_decoded = record_store.load_record(&old)?;
105✔
58
                let old_fulfilled = self.expression.evaluate(&old_decoded, &self.input_schema)?
105✔
59
                    == Field::Boolean(true);
105✔
60
                let new_decoded = record_store.load_record(&new)?;
105✔
61
                let new_fulfilled = self.expression.evaluate(&new_decoded, &self.input_schema)?
105✔
62
                    == Field::Boolean(true);
105✔
63
                match (old_fulfilled, new_fulfilled) {
105✔
64
                    (true, true) => {
105✔
65
                        // both records fulfills the WHERE condition, forward the operation
105✔
66
                        fw.send(ProcessorOperation::Update { old, new }, DEFAULT_PORT_HANDLE);
105✔
67
                    }
105✔
68
                    (true, false) => {
×
69
                        // the old record fulfills the WHERE condition while then new one doesn't, forward a delete operation
×
70
                        fw.send(ProcessorOperation::Delete { old }, DEFAULT_PORT_HANDLE);
×
71
                    }
×
72
                    (false, true) => {
×
73
                        // the old record doesn't fulfill the WHERE condition while then new one does, forward an insert operation
×
74
                        fw.send(ProcessorOperation::Insert { new }, DEFAULT_PORT_HANDLE);
×
75
                    }
×
76
                    (false, false) => {
×
77
                        // both records doesn't fulfill the WHERE condition, don't forward the operation
×
78
                    }
×
79
                }
80
            }
81
        }
82
        Ok(())
12,765✔
83
    }
12,765✔
84

85
    fn serialize(
×
86
        &mut self,
×
87
        _record_store: &ProcessorRecordStore,
×
88
        _object: Object,
×
89
    ) -> Result<(), BoxedError> {
×
90
        Ok(())
×
91
    }
×
92
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc