• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4392484403

pending completion
4392484403

push

github

GitHub
feat: Asynchoronous indexing (#1206)

270 of 270 new or added lines in 13 files covered. (100.0%)

28714 of 38777 relevant lines covered (74.05%)

89484.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.91
/dozer-sql/src/pipeline/projection/processor.rs
1
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
2

3
use dozer_core::channels::ProcessorChannelForwarder;
4
use dozer_core::epoch::Epoch;
5
use dozer_core::errors::ExecutionError;
6
use dozer_core::errors::ExecutionError::InternalError;
7
use dozer_core::node::{PortHandle, Processor};
8
use dozer_core::storage::lmdb_storage::SharedTransaction;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::types::{Operation, Record, Schema};
11

12
#[derive(Debug)]
×
13
pub struct ProjectionProcessor {
14
    expressions: Vec<Expression>,
15
    input_schema: Schema,
16
}
17

18
impl ProjectionProcessor {
19
    pub fn new(input_schema: Schema, expressions: Vec<Expression>) -> Self {
625✔
20
        Self {
625✔
21
            input_schema,
625✔
22
            expressions,
625✔
23
        }
625✔
24
    }
625✔
25

26
    fn delete(&mut self, record: &Record) -> Result<Operation, ExecutionError> {
2,250✔
27
        let mut results = vec![];
2,250✔
28

29
        for expr in &self.expressions {
8,860✔
30
            results.push(
31
                expr.evaluate(record, &self.input_schema)
6,610✔
32
                    .map_err(|e| InternalError(Box::new(e)))?,
6,610✔
33
            );
34
        }
35
        Ok(Operation::Delete {
2,250✔
36
            old: Record::new(None, results, None),
2,250✔
37
        })
2,250✔
38
    }
2,250✔
39

40
    fn insert(&mut self, record: &Record) -> Result<Operation, ExecutionError> {
454,550✔
41
        let mut results = vec![];
454,550✔
42

43
        for expr in self.expressions.clone() {
926,310✔
44
            results.push(
45
                expr.evaluate(record, &self.input_schema)
926,309✔
46
                    .map_err(|e| InternalError(Box::new(e)))?,
926,309✔
47
            );
48
        }
49
        Ok(Operation::Insert {
459,530✔
50
            new: Record::new(None, results, None),
459,530✔
51
        })
459,530✔
52
    }
459,530✔
53

54
    fn update(&self, old: &Record, new: &Record) -> Result<Operation, ExecutionError> {
×
55
        let mut old_results = vec![];
×
56
        let mut new_results = vec![];
×
57

58
        for expr in &self.expressions {
×
59
            old_results.push(
60
                expr.evaluate(old, &self.input_schema)
×
61
                    .map_err(|e| InternalError(Box::new(e)))?,
×
62
            );
63
            new_results.push(
64
                expr.evaluate(new, &self.input_schema)
×
65
                    .map_err(|e| InternalError(Box::new(e)))?,
×
66
            );
67
        }
68

69
        Ok(Operation::Update {
×
70
            old: Record::new(None, old_results, None),
×
71
            new: Record::new(None, new_results, None),
×
72
        })
×
73
    }
×
74
}
75

76
impl Processor for ProjectionProcessor {
77
    fn process(
456,240✔
78
        &mut self,
456,240✔
79
        _from_port: PortHandle,
456,240✔
80
        op: Operation,
456,240✔
81
        fw: &mut dyn ProcessorChannelForwarder,
456,240✔
82
        _tx: &SharedTransaction,
456,240✔
83
    ) -> Result<(), ExecutionError> {
456,240✔
84
        let _ = match op {
456,240✔
85
            Operation::Delete { ref old } => fw.send(self.delete(old)?, DEFAULT_PORT_HANDLE),
390✔
86
            Operation::Insert { ref new } => fw.send(self.insert(new)?, DEFAULT_PORT_HANDLE),
455,850✔
87
            Operation::Update { ref old, ref new } => {
×
88
                fw.send(self.update(old, new)?, DEFAULT_PORT_HANDLE)
×
89
            }
90
        };
91
        Ok(())
454,410✔
92
    }
454,410✔
93

94
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
606✔
95
        Ok(())
606✔
96
    }
606✔
97
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc