• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3975475267

pending completion
3975475267

Pull #699

github

GitHub
Merge cbe01669c into 02f99a9c0
Pull Request #699: feature: Atomatically trim record history in `RecordWriter`

229 of 229 new or added lines in 6 files covered. (100.0%)

22375 of 33744 relevant lines covered (66.31%)

45958.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.68
/dozer-core/src/dag/executor_utils.rs
1
#![allow(clippy::type_complexity)]
2
use crate::dag::dag::{Dag, Edge, Endpoint};
3
use crate::dag::dag_metadata::METADATA_DB_NAME;
4
use crate::dag::errors::ExecutionError;
5
use crate::dag::errors::ExecutionError::InvalidOperation;
6
use crate::dag::executor::ExecutorOperation;
7
use crate::dag::node::{NodeHandle, OutputPortDef, OutputPortType, PortHandle};
8
use crate::dag::record_store::{
9
    AutogenRowKeyLookupRecordReader, PrimaryKeyValueLookupRecordReader, RecordReader,
10
};
11
use crate::storage::common::Database;
12
use crate::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use crossbeam::channel::{bounded, Receiver, Select, Sender};
14
use dozer_types::types::{Operation, Schema};
15
use std::collections::HashMap;
16
use std::path::Path;
17

18
pub(crate) struct StorageMetadata {
19
    pub env: LmdbEnvironmentManager,
20
    pub meta_db: Database,
21
}
22

×
23
impl StorageMetadata {
×
24
    pub fn new(env: LmdbEnvironmentManager, meta_db: Database) -> Self {
564✔
25
        Self { env, meta_db }
564✔
26
    }
564✔
27
}
×
28

×
29
pub(crate) fn init_component<F>(
567✔
30
    node_handle: &NodeHandle,
567✔
31
    base_path: &Path,
567✔
32
    mut init_f: F,
567✔
33
) -> Result<StorageMetadata, ExecutionError>
567✔
34
where
567✔
35
    F: FnMut(&mut LmdbEnvironmentManager) -> Result<(), ExecutionError>,
567✔
36
{
567✔
37
    let mut env = LmdbEnvironmentManager::create(base_path, format!("{}", node_handle).as_str())?;
567✔
38
    let db = env.open_database(METADATA_DB_NAME, false)?;
564✔
39
    init_f(&mut env)?;
564✔
40
    Ok(StorageMetadata::new(env, db))
564✔
41
}
567✔
42
#[inline]
×
43
pub(crate) fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
15,317✔
44
    let mut sel = Select::new();
15,317✔
45
    for r in receivers {
31,026✔
46
        sel.recv(r);
15,709✔
47
    }
15,709✔
48
    sel
15,317✔
49
}
15,317✔
50

×
51
pub(crate) fn requires_schema_update(
×
52
    _new: Schema,
×
53
    _port_handle: &PortHandle,
×
54
    input_schemas: &mut HashMap<PortHandle, Schema>,
×
55
    input_ports: &[PortHandle],
×
56
) -> bool {
×
57
    let count = input_ports
×
58
        .iter()
×
59
        .filter(|e| !input_schemas.contains_key(*e))
×
60
        .count();
×
61
    count == 0
×
62
}
×
63

×
64
pub(crate) fn map_to_op(op: ExecutorOperation) -> Result<Operation, ExecutionError> {
×
65
    match op {
×
66
        ExecutorOperation::Delete { old } => Ok(Operation::Delete { old }),
×
67
        ExecutorOperation::Insert { new } => Ok(Operation::Insert { new }),
×
68
        ExecutorOperation::Update { old, new } => Ok(Operation::Update { old, new }),
×
69
        _ => Err(InvalidOperation(op.to_string())),
×
70
    }
×
71
}
×
72

×
73
pub(crate) fn map_to_exec_op(op: Operation) -> ExecutorOperation {
×
74
    match op {
×
75
        Operation::Update { old, new } => ExecutorOperation::Update { old, new },
×
76
        Operation::Delete { old } => ExecutorOperation::Delete { old },
×
77
        Operation::Insert { new } => ExecutorOperation::Insert { new },
×
78
    }
×
79
}
×
80

×
81
pub(crate) fn index_edges(
128✔
82
    dag: &Dag,
128✔
83
    channel_buf_sz: usize,
128✔
84
) -> (
128✔
85
    HashMap<NodeHandle, HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>>,
128✔
86
    HashMap<NodeHandle, HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>>,
128✔
87
) {
128✔
88
    let mut senders: HashMap<NodeHandle, HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>> =
128✔
89
        HashMap::new();
128✔
90
    let mut receivers: HashMap<NodeHandle, HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>> =
128✔
91
        HashMap::new();
128✔
92

×
93
    for edge in dag.edges.iter() {
443✔
94
        if !senders.contains_key(&edge.from.node) {
443✔
95
            senders.insert(edge.from.node.clone(), HashMap::new());
437✔
96
        }
437✔
97
        if !receivers.contains_key(&edge.to.node) {
443✔
98
            receivers.insert(edge.to.node.clone(), HashMap::new());
431✔
99
        }
431✔
100

×
101
        let (tx, rx) = bounded(channel_buf_sz);
443✔
102

443✔
103
        let rcv_port: PortHandle = edge.to.port;
443✔
104
        if receivers
443✔
105
            .get(&edge.to.node)
443✔
106
            .unwrap()
443✔
107
            .contains_key(&rcv_port)
443✔
108
        {
×
109
            receivers
×
110
                .get_mut(&edge.to.node)
×
111
                .unwrap()
×
112
                .get_mut(&rcv_port)
×
113
                .unwrap()
×
114
                .push(rx);
×
115
        } else {
443✔
116
            receivers
443✔
117
                .get_mut(&edge.to.node)
443✔
118
                .unwrap()
443✔
119
                .insert(rcv_port, vec![rx]);
443✔
120
        }
443✔
121

×
122
        let snd_port: PortHandle = edge.from.port;
443✔
123
        if senders
443✔
124
            .get(&edge.from.node)
443✔
125
            .unwrap()
443✔
126
            .contains_key(&snd_port)
443✔
127
        {
4✔
128
            senders
4✔
129
                .get_mut(&edge.from.node)
4✔
130
                .unwrap()
4✔
131
                .get_mut(&snd_port)
4✔
132
                .unwrap()
4✔
133
                .push(tx);
4✔
134
        } else {
439✔
135
            senders
439✔
136
                .get_mut(&edge.from.node)
439✔
137
                .unwrap()
439✔
138
                .insert(snd_port, vec![tx]);
439✔
139
        }
439✔
140
    }
×
141

×
142
    (senders, receivers)
128✔
143
}
128✔
144

×
145
pub(crate) fn build_receivers_lists(
427✔
146
    receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
427✔
147
) -> (Vec<PortHandle>, Vec<Receiver<ExecutorOperation>>) {
427✔
148
    let mut handles_ls: Vec<PortHandle> = Vec::new();
427✔
149
    let mut receivers_ls: Vec<Receiver<ExecutorOperation>> = Vec::new();
427✔
150
    for e in receivers {
866✔
151
        for r in e.1 {
878✔
152
            receivers_ls.push(r);
439✔
153
            handles_ls.push(e.0);
439✔
154
        }
439✔
155
    }
×
156
    (handles_ls, receivers_ls)
427✔
157
}
427✔
158

×
159
fn get_inputs_for_output(edges: &[Edge], node: &NodeHandle, port: &PortHandle) -> Vec<Endpoint> {
20✔
160
    edges
20✔
161
        .iter()
20✔
162
        .filter(|e| e.from.node == *node && e.from.port == *port)
111✔
163
        .map(|e| e.to.clone())
24✔
164
        .collect()
20✔
165
}
20✔
166

×
167
const PORT_STATE_KEY: &str = "__PORT_STATE_";
×
168

×
169
#[derive(Debug)]
×
170
pub(crate) struct StateOptions {
×
171
    pub(crate) db: Database,
×
172
    pub(crate) meta_db: Database,
173
    pub(crate) typ: OutputPortType,
×
174
}
×
175

×
176
pub(crate) fn create_ports_databases_and_fill_downstream_record_readers(
433✔
177
    handle: &NodeHandle,
433✔
178
    edges: &[Edge],
433✔
179
    mut env: LmdbEnvironmentManager,
433✔
180
    output_ports: &[OutputPortDef],
433✔
181
    record_stores: &mut HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>,
433✔
182
) -> Result<(SharedTransaction, HashMap<PortHandle, StateOptions>), ExecutionError> {
433✔
183
    let mut port_databases: Vec<Option<StateOptions>> = Vec::new();
433✔
184
    for port in output_ports {
868✔
185
        let opt = match &port.typ {
435✔
186
            OutputPortType::Stateless => None,
415✔
187
            typ => {
20✔
188
                let db =
20✔
189
                    env.open_database(&format!("{}_{}", PORT_STATE_KEY, port.handle), false)?;
20✔
190
                let meta_db =
20✔
191
                    env.open_database(&format!("{}_{}_META", PORT_STATE_KEY, port.handle), false)?;
20✔
192
                Some(StateOptions {
20✔
193
                    db,
20✔
194
                    meta_db,
20✔
195
                    typ: typ.clone(),
20✔
196
                })
20✔
197
            }
×
198
        };
×
199
        port_databases.push(opt);
435✔
200
    }
×
201

×
202
    let master_tx = env.create_txn()?;
433✔
203

×
204
    for (state_options, port) in port_databases.iter().zip(output_ports.iter()) {
435✔
205
        if let Some(state_options) = state_options {
435✔
206
            for endpoint in get_inputs_for_output(edges, handle, &port.handle) {
24✔
207
                let record_reader: Box<dyn RecordReader> = match port.typ {
24✔
208
                    OutputPortType::AutogenRowKeyLookup => Box::new(
1✔
209
                        AutogenRowKeyLookupRecordReader::new(master_tx.clone(), state_options.db),
1✔
210
                    ),
1✔
211
                    OutputPortType::StatefulWithPrimaryKeyLookup { .. } => Box::new(
23✔
212
                        PrimaryKeyValueLookupRecordReader::new(master_tx.clone(), state_options.db),
23✔
213
                    ),
23✔
214
                    OutputPortType::Stateless => panic!("Internal error: Invalid port type"),
×
215
                };
×
216

×
217
                record_stores
24✔
218
                    .get_mut(&endpoint.node)
24✔
219
                    .expect("Record store HashMap must be created for every node upfront")
24✔
220
                    .insert(endpoint.port, record_reader);
24✔
221
            }
×
222
        }
415✔
223
    }
×
224

×
225
    let port_databases = output_ports
433✔
226
        .iter()
433✔
227
        .zip(port_databases.into_iter())
433✔
228
        .flat_map(|(output_port, state_option)| {
435✔
229
            state_option.map(|state_option| (output_port.handle, state_option))
435✔
230
        })
435✔
231
        .collect();
433✔
232

433✔
233
    Ok((master_tx, port_databases))
433✔
234
}
433✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc