• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5672512448

pending completion
5672512448

push

github

web-flow
chore: Change `make_from!` in `from_arrow` to func to improve readability (#1792)

31 of 31 new or added lines in 4 files covered. (100.0%)

45630 of 59777 relevant lines covered (76.33%)

38810.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.95
/dozer-sql/src/pipeline/table_operator/lifetime.rs
1
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordRef};
2
use dozer_types::types::{Field, Lifetime, Schema};
3

4
use crate::pipeline::{errors::TableOperatorError, expression::execution::Expression};
5

6
use super::operator::{TableOperator, TableOperatorType};
7

8
#[derive(Debug)]
×
9
pub struct LifetimeTableOperator {
10
    operator: Option<Box<TableOperatorType>>,
11
    expression: Expression,
×
12
    duration: std::time::Duration,
13
}
14

15
impl LifetimeTableOperator {
16
    pub fn new(
1✔
17
        operator: Option<Box<TableOperatorType>>,
1✔
18
        expression: Expression,
1✔
19
        duration: std::time::Duration,
1✔
20
    ) -> Self {
1✔
21
        Self {
1✔
22
            operator,
1✔
23
            expression,
1✔
24
            duration,
1✔
25
        }
1✔
26
    }
1✔
27
}
×
28

×
29
impl TableOperator for LifetimeTableOperator {
×
30
    fn get_name(&self) -> String {
×
31
        "TTL".to_owned()
×
32
    }
×
33

×
34
    fn execute(
1✔
35
        &self,
1✔
36
        record: &ProcessorRecordRef,
1✔
37
        schema: &Schema,
1✔
38
    ) -> Result<Vec<ProcessorRecordRef>, TableOperatorError> {
1✔
39
        let source_record = record.clone();
1✔
40
        let mut ttl_records = vec![];
1✔
41
        if let Some(operator) = &self.operator {
1✔
42
            let operator_records = operator.execute(&source_record, schema)?;
×
43

×
44
            let schema = operator.get_output_schema(schema)?;
×
45

×
46
            let reference = match self
×
47
                .expression
×
48
                .evaluate(source_record.get_record(), &schema)
×
49
                .map_err(|err| TableOperatorError::InternalError(Box::new(err)))?
×
50
            {
×
51
                Field::Timestamp(timestamp) => timestamp,
×
52
                other => return Err(TableOperatorError::InvalidTtlInputType(other)),
×
53
            };
×
54

×
55
            let lifetime = Some(Lifetime {
×
56
                reference,
×
57
                duration: self.duration,
×
58
            });
×
59
            for operator_record in operator_records {
×
60
                let mut cloned_record = ProcessorRecord::from_referenced_record(operator_record);
×
61
                cloned_record.set_lifetime(lifetime.clone());
×
62
                ttl_records.push(ProcessorRecordRef::new(cloned_record));
×
63
            }
×
64
        } else {
1✔
65
            let reference = match self
1✔
66
                .expression
1✔
67
                .evaluate(source_record.get_record(), schema)
1✔
68
                .map_err(|err| TableOperatorError::InternalError(Box::new(err)))?
1✔
69
            {
×
70
                Field::Timestamp(timestamp) => timestamp,
1✔
71
                other => return Err(TableOperatorError::InvalidTtlInputType(other)),
×
72
            };
×
73

×
74
            let lifetime = Some(Lifetime {
1✔
75
                reference,
1✔
76
                duration: self.duration,
1✔
77
            });
1✔
78

1✔
79
            let mut cloned_record = ProcessorRecord::from_referenced_record(source_record);
1✔
80
            cloned_record.set_lifetime(lifetime);
1✔
81
            ttl_records.push(ProcessorRecordRef::new(cloned_record));
1✔
82
        }
83

84
        Ok(ttl_records)
1✔
85
    }
1✔
86

87
    fn get_output_schema(&self, schema: &Schema) -> Result<Schema, TableOperatorError> {
×
88
        Ok(schema.clone())
×
89
    }
×
90
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc