• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5651881554

pending completion
5651881554

push

github

web-flow
feat: refactor records (#1781)

* feat: add processor record

* chore: refactor processor record

* chore: introduce processor operation

* chore: refactor processors

* chore: refactor log operations

* fix import errors

* chore: fix index

* chore: fix recursive fetch

* fix: Don't allow cloning `ProcessorRecord`

* chore: fix errors

* fix: Don't expose `ProcessorRecordRef` implementation detail

* fix: Add `ProcessorRecord::total_len` to correctly handle `extend_` functions

* fix: Fix tests compilation by @chloeminkyung

* fix: Fix window and lifetime sql unit tests

* fix: clippy fix

* chore: fmt fix

* chore: Move `ProcessorRecord` to `dozer-core`

---------

Co-authored-by: Chloe Kim <chloeminkyung@gmail.com>
Co-authored-by: chubei <914745487@qq.com>

849 of 849 new or added lines in 64 files covered. (100.0%)

43094 of 55381 relevant lines covered (77.81%)

29359.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/dozer-core/src/executor_operation.rs
1
use dozer_types::{epoch::Epoch, types::Operation};
2

3
use crate::processor_record::ProcessorRecordRef;
4

5
#[derive(Clone, Debug, PartialEq, Eq)]
86,647✔
6
/// A CDC event.
7
pub enum ProcessorOperation {
8
    Delete {
9
        old: ProcessorRecordRef,
10
    },
11
    Insert {
12
        new: ProcessorRecordRef,
13
    },
14
    Update {
15
        old: ProcessorRecordRef,
16
        new: ProcessorRecordRef,
17
    },
18
}
19

20
impl From<Operation> for ProcessorOperation {
21
    fn from(record: Operation) -> Self {
3,635,924✔
22
        match record {
3,635,924✔
23
            Operation::Delete { old } => ProcessorOperation::Delete {
647✔
24
                old: ProcessorRecordRef::new(old.into()),
647✔
25
            },
647✔
26
            Operation::Insert { new } => ProcessorOperation::Insert {
3,635,052✔
27
                new: ProcessorRecordRef::new(new.into()),
3,635,052✔
28
            },
3,635,052✔
29
            Operation::Update { old, new } => ProcessorOperation::Update {
225✔
30
                old: ProcessorRecordRef::new(old.into()),
225✔
31
                new: ProcessorRecordRef::new(new.into()),
225✔
32
            },
225✔
33
        }
34
    }
3,635,924✔
35
}
36

37
impl ProcessorOperation {
38
    pub fn clone_deref(&self) -> Operation {
40✔
39
        match self {
40✔
40
            ProcessorOperation::Delete { old } => Operation::Delete {
5✔
41
                old: old.get_record().clone_deref(),
5✔
42
            },
5✔
43
            ProcessorOperation::Insert { new } => Operation::Insert {
35✔
44
                new: new.get_record().clone_deref(),
35✔
45
            },
35✔
46
            ProcessorOperation::Update { old, new } => Operation::Update {
×
47
                old: old.get_record().clone_deref(),
×
48
                new: new.get_record().clone_deref(),
×
49
            },
×
50
        }
51
    }
40✔
52
}
53

54
#[derive(Clone, Debug, PartialEq, Eq)]
86,362✔
55
pub enum ExecutorOperation {
56
    Op { op: ProcessorOperation },
57
    Commit { epoch: Epoch },
58
    Terminate,
59
    SnapshottingDone { connection_name: String },
60
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc