• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.43
/dozer-sql/src/pipeline/projection/processor.rs
1
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
2

3
use dozer_core::dag::channels::ProcessorChannelForwarder;
4
use dozer_core::dag::epoch::Epoch;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::errors::ExecutionError::InternalError;
7
use dozer_core::dag::node::{PortHandle, Processor};
8
use dozer_core::dag::record_store::RecordReader;
9
use dozer_core::dag::DEFAULT_PORT_HANDLE;
10
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
11
use dozer_types::types::{Operation, Record, Schema};
12
use std::collections::HashMap;
13

14
#[derive(Debug)]
×
15
pub struct ProjectionProcessor {
16
    expressions: Vec<Expression>,
17
    input_schema: Schema,
18
}
19

20
impl ProjectionProcessor {
21
    pub fn new(input_schema: Schema, expressions: Vec<Expression>) -> Self {
124✔
22
        Self {
124✔
23
            input_schema,
124✔
24
            expressions,
124✔
25
        }
124✔
26
    }
124✔
27

28
    fn delete(&mut self, record: &Record) -> Result<Operation, ExecutionError> {
16,341✔
29
        let mut results = vec![];
16,341✔
30

31
        for expr in &self.expressions {
65,370✔
32
            results.push(
33
                expr.evaluate(record, &self.input_schema)
49,029✔
34
                    .map_err(|e| InternalError(Box::new(e)))?,
49,029✔
35
            );
×
36
        }
37
        Ok(Operation::Delete {
16,341✔
38
            old: Record::new(None, results, None),
16,341✔
39
        })
16,341✔
40
    }
16,341✔
41

×
42
    fn insert(&mut self, record: &Record) -> Result<Operation, ExecutionError> {
67,096✔
43
        let mut results = vec![];
67,096✔
44

×
45
        for expr in self.expressions.clone() {
169,615✔
46
            results.push(
×
47
                expr.evaluate(record, &self.input_schema)
169,615✔
48
                    .map_err(|e| InternalError(Box::new(e)))?,
169,615✔
49
            );
×
50
        }
×
51
        Ok(Operation::Insert {
67,096✔
52
            new: Record::new(None, results, None),
67,096✔
53
        })
67,096✔
54
    }
67,096✔
55

×
56
    fn update(&self, old: &Record, new: &Record) -> Result<Operation, ExecutionError> {
×
57
        let mut old_results = vec![];
×
58
        let mut new_results = vec![];
×
59

×
60
        for expr in &self.expressions {
×
61
            old_results.push(
62
                expr.evaluate(old, &self.input_schema)
×
63
                    .map_err(|e| InternalError(Box::new(e)))?,
×
64
            );
×
65
            new_results.push(
×
66
                expr.evaluate(new, &self.input_schema)
×
67
                    .map_err(|e| InternalError(Box::new(e)))?,
×
68
            );
69
        }
×
70

×
71
        Ok(Operation::Update {
×
72
            old: Record::new(None, old_results, None),
×
73
            new: Record::new(None, new_results, None),
×
74
        })
×
75
    }
×
76
}
×
77

×
78
impl Processor for ProjectionProcessor {
×
79
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
124✔
80
        Ok(())
124✔
81
    }
124✔
82

83
    fn process(
83,437✔
84
        &mut self,
83,437✔
85
        _from_port: PortHandle,
83,437✔
86
        op: Operation,
83,437✔
87
        fw: &mut dyn ProcessorChannelForwarder,
83,437✔
88
        _tx: &SharedTransaction,
83,437✔
89
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
83,437✔
90
    ) -> Result<(), ExecutionError> {
83,437✔
91
        let _ = match op {
83,437✔
92
            Operation::Delete { ref old } => fw.send(self.delete(old)?, DEFAULT_PORT_HANDLE),
16,341✔
93
            Operation::Insert { ref new } => fw.send(self.insert(new)?, DEFAULT_PORT_HANDLE),
67,096✔
94
            Operation::Update { ref old, ref new } => {
×
95
                fw.send(self.update(old, new)?, DEFAULT_PORT_HANDLE)
×
96
            }
×
97
        };
×
98
        Ok(())
83,437✔
99
    }
83,437✔
100

101
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
126✔
102
        Ok(())
126✔
103
    }
126✔
104
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc