• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.71
/dozer-sql/src/pipeline/selection/processor.rs
1
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::errors::ExecutionError::InternalError;
6
use dozer_core::node::{PortHandle, Processor};
7
use dozer_core::record_store::RecordReader;
8
use dozer_core::storage::lmdb_storage::SharedTransaction;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::types::{Field, Operation, Schema};
11
use std::collections::HashMap;
12

13
#[derive(Debug)]
×
14
pub struct SelectionProcessor {
15
    expression: Expression,
16
    input_schema: Schema,
17
}
18

19
impl SelectionProcessor {
20
    pub fn new(input_schema: Schema, expression: Expression) -> Self {
71✔
21
        Self {
71✔
22
            input_schema,
71✔
23
            expression,
71✔
24
        }
71✔
25
    }
71✔
26

27
    fn delete(&self, record: &dozer_types::types::Record) -> Operation {
×
28
        Operation::Delete {
×
29
            old: record.clone(),
×
30
        }
×
31
    }
×
32

33
    fn insert(&self, record: &dozer_types::types::Record) -> Operation {
×
34
        Operation::Insert {
×
35
            new: record.clone(),
×
36
        }
×
37
    }
×
38
}
39

40
impl Processor for SelectionProcessor {
41
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
71✔
42
        Ok(())
71✔
43
    }
71✔
44

45
    fn process(
17,070✔
46
        &mut self,
17,070✔
47
        _from_port: PortHandle,
17,070✔
48
        op: Operation,
17,070✔
49
        fw: &mut dyn ProcessorChannelForwarder,
17,070✔
50
        _tx: &SharedTransaction,
17,070✔
51
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
17,070✔
52
    ) -> Result<(), ExecutionError> {
17,070✔
53
        match op {
17,070✔
54
            Operation::Delete { ref old } => {
×
55
                if self
×
56
                    .expression
×
57
                    .evaluate(old, &self.input_schema)
×
58
                    .map_err(|e| InternalError(Box::new(e)))?
×
59
                    == Field::Boolean(true)
×
60
                {
×
61
                    let _ = fw.send(op, DEFAULT_PORT_HANDLE);
×
62
                }
×
63
            }
64
            Operation::Insert { ref new } => {
17,070✔
65
                if self
17,070✔
66
                    .expression
17,070✔
67
                    .evaluate(new, &self.input_schema)
17,070✔
68
                    .map_err(|e| InternalError(Box::new(e)))?
17,070✔
69
                    == Field::Boolean(true)
17,070✔
70
                {
10,850✔
71
                    let _ = fw.send(op, DEFAULT_PORT_HANDLE);
10,850✔
72
                }
16,220✔
73
            }
74
            Operation::Update { ref old, ref new } => {
×
75
                let old_fulfilled = self
×
76
                    .expression
×
77
                    .evaluate(old, &self.input_schema)
×
78
                    .map_err(|e| InternalError(Box::new(e)))?
×
79
                    == Field::Boolean(true);
×
80
                let new_fulfilled = self
×
81
                    .expression
×
82
                    .evaluate(new, &self.input_schema)
×
83
                    .map_err(|e| InternalError(Box::new(e)))?
×
84
                    == Field::Boolean(true);
×
85
                match (old_fulfilled, new_fulfilled) {
×
86
                    (true, true) => {
×
87
                        // both records fulfills the WHERE condition, forward the operation
×
88
                        let _ = fw.send(op, DEFAULT_PORT_HANDLE);
×
89
                    }
×
90
                    (true, false) => {
×
91
                        // the old record fulfills the WHERE condition while then new one doesn't, forward a delete operation
×
92
                        let _ = fw.send(self.delete(old), DEFAULT_PORT_HANDLE);
×
93
                    }
×
94
                    (false, true) => {
×
95
                        // the old record doesn't fulfill the WHERE condition while then new one does, forward an insert operation
×
96
                        let _ = fw.send(self.insert(new), DEFAULT_PORT_HANDLE);
×
97
                    }
×
98
                    (false, false) => {
×
99
                        // both records doesn't fulfill the WHERE condition, don't forward the operation
×
100
                    }
×
101
                }
102
            }
103
            Operation::SnapshottingDone { .. } => {
×
104
                let _ = fw.send(op, DEFAULT_PORT_HANDLE);
×
105
            }
×
106
        }
107
        Ok(())
17,070✔
108
    }
17,070✔
109
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc