• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/table_operator/processor.rs
1
use dozer_core::channels::ProcessorChannelForwarder;
2
use dozer_core::dozer_log::storage::Object;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::executor_operation::ProcessorOperation;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::processor_record::ProcessorRecordStore;
7
use dozer_core::DEFAULT_PORT_HANDLE;
8
use dozer_types::errors::internal::BoxedError;
9
use dozer_types::types::Schema;
10

11
use crate::pipeline::errors::PipelineError;
12

13
use super::operator::{TableOperator, TableOperatorType};
14

15
#[derive(Debug)]
×
16
pub struct TableOperatorProcessor {
17
    _id: String,
18
    operator: TableOperatorType,
19
    input_schema: Schema,
20
}
21

22
impl TableOperatorProcessor {
23
    pub fn new(
×
24
        id: String,
×
25
        operator: TableOperatorType,
×
26
        input_schema: Schema,
×
27
        _checkpoint_data: Option<Vec<u8>>,
×
28
    ) -> Self {
×
29
        Self {
×
30
            _id: id,
×
31
            operator,
×
32
            input_schema,
×
33
        }
×
34
    }
×
35
}
36

37
impl Processor for TableOperatorProcessor {
38
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
×
39
        Ok(())
×
40
    }
×
41

42
    fn process(
×
43
        &mut self,
×
44
        _from_port: PortHandle,
×
45
        record_store: &ProcessorRecordStore,
×
46
        op: ProcessorOperation,
×
47
        fw: &mut dyn ProcessorChannelForwarder,
×
48
    ) -> Result<(), BoxedError> {
×
49
        match op {
×
50
            ProcessorOperation::Delete { ref old } => {
×
51
                let records = self
×
52
                    .operator
×
53
                    .execute(record_store, old, &self.input_schema)
×
54
                    .map_err(PipelineError::TableOperatorError)?;
×
55
                for record in records {
×
56
                    fw.send(
×
57
                        ProcessorOperation::Delete { old: record },
×
58
                        DEFAULT_PORT_HANDLE,
×
59
                    );
×
60
                }
×
61
            }
62
            ProcessorOperation::Insert { ref new } => {
×
63
                let records = self
×
64
                    .operator
×
65
                    .execute(record_store, new, &self.input_schema)
×
66
                    .map_err(PipelineError::TableOperatorError)?;
×
67
                for record in records {
×
68
                    fw.send(
×
69
                        ProcessorOperation::Insert { new: record },
×
70
                        DEFAULT_PORT_HANDLE,
×
71
                    );
×
72
                }
×
73
            }
74
            ProcessorOperation::Update { ref old, ref new } => {
×
75
                let old_records = self
×
76
                    .operator
×
77
                    .execute(record_store, old, &self.input_schema)
×
78
                    .map_err(PipelineError::TableOperatorError)?;
×
79
                for record in old_records {
×
80
                    fw.send(
×
81
                        ProcessorOperation::Delete { old: record },
×
82
                        DEFAULT_PORT_HANDLE,
×
83
                    );
×
84
                }
×
85

86
                let new_records = self
×
87
                    .operator
×
88
                    .execute(record_store, new, &self.input_schema)
×
89
                    .map_err(PipelineError::TableOperatorError)?;
×
90
                for record in new_records {
×
91
                    fw.send(
×
92
                        ProcessorOperation::Insert { new: record },
×
93
                        DEFAULT_PORT_HANDLE,
×
94
                    );
×
95
                }
×
96
            }
97
        }
98
        Ok(())
×
99
    }
×
100

101
    fn serialize(
×
102
        &mut self,
×
103
        _record_store: &ProcessorRecordStore,
×
104
        _object: Object,
×
105
    ) -> Result<(), BoxedError> {
×
106
        Ok(())
×
107
    }
×
108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc