• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.6
/dozer-core/src/dag/dag_metadata.rs
1
use crate::dag::dag_schemas::NodeSchemas;
2
use crate::dag::errors::ExecutionError;
3
use crate::dag::errors::ExecutionError::{InvalidNodeHandle, MetadataAlreadyExists};
4
use crate::dag::node::{NodeHandle, PortHandle};
5
use crate::dag::Dag;
6
use crate::storage::common::Seek;
7
use crate::storage::errors::StorageError;
8
use crate::storage::errors::StorageError::{DeserializationError, SerializationError};
9
use crate::storage::lmdb_storage::{
10
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
11
};
12
use dozer_types::bincode;
13
use dozer_types::types::Schema;
14
use lmdb::Database;
15
use std::collections::HashMap;
16

17
use std::iter::once;
18
use std::path::Path;
19

20
use super::epoch::{OpIdentifier, SourceStates};
21
use super::hash_map_to_vec::insert_vec_element;
22

23
pub(crate) const METADATA_DB_NAME: &str = "__META__";
24
const SOURCE_ID_IDENTIFIER: u8 = 0_u8;
25
pub(crate) const OUTPUT_SCHEMA_IDENTIFIER: u8 = 1_u8;
26
pub(crate) const INPUT_SCHEMA_IDENTIFIER: u8 = 2_u8;
27

28
pub(crate) enum Consistency {
29
    FullyConsistent(Option<OpIdentifier>),
30
    PartiallyConsistent(HashMap<Option<OpIdentifier>, Vec<NodeHandle>>),
31
}
32

33
pub(crate) struct DagMetadata {
34
    pub commits: SourceStates,
35
    pub input_schemas: HashMap<PortHandle, Schema>,
36
    pub output_schemas: HashMap<PortHandle, Schema>,
37
}
38

×
39
pub(crate) struct DagMetadataManager<'a, T: Clone> {
×
40
    dag: &'a Dag<T>,
×
41
    path: &'a Path,
×
42
}
×
43

×
44
impl<'a, T: Clone + 'a> DagMetadataManager<'a, T> {
45
    pub fn new(
129✔
46
        dag: &'a Dag<T>,
129✔
47
        path: &'a Path,
129✔
48
    ) -> Result<DagMetadataManager<'a, T>, ExecutionError> {
129✔
49
        Ok(Self { path, dag })
129✔
50
    }
129✔
51

52
    fn get_node_checkpoint_metadata(
1,535✔
53
        path: &Path,
1,535✔
54
        name: &NodeHandle,
1,535✔
55
    ) -> Result<Option<DagMetadata>, ExecutionError> {
1,535✔
56
        let env_name = metadata_environment_name(name);
1,535✔
57
        if !LmdbEnvironmentManager::exists(path, &env_name) {
1,535✔
58
            return Ok(None);
1,406✔
59
        }
129✔
60

×
61
        let mut env = LmdbEnvironmentManager::create(path, &env_name)?;
129✔
62
        let db = env.open_database(METADATA_DB_NAME, false)?;
129✔
63
        let txn = env.create_txn()?;
129✔
64
        let txn = SharedTransaction::try_unwrap(txn)
129✔
65
            .expect("We just created this `SharedTransaction`. It's not shared.");
129✔
66

×
67
        let cur = txn.open_ro_cursor(db)?;
129✔
68
        if !cur.first()? {
129✔
69
            return Err(ExecutionError::InternalDatabaseError(
×
70
                StorageError::InvalidRecord,
×
71
            ));
×
72
        }
129✔
73

129✔
74
        let mut commits = SourceStates::default();
129✔
75
        let mut input_schemas: HashMap<PortHandle, Schema> = HashMap::new();
129✔
76
        let mut output_schemas: HashMap<PortHandle, Schema> = HashMap::new();
129✔
77

×
78
        loop {
×
79
            let value = cur.read()?.ok_or(ExecutionError::InternalDatabaseError(
313✔
80
                StorageError::InvalidRecord,
313✔
81
            ))?;
313✔
82
            match value.0[0] {
313✔
83
                SOURCE_ID_IDENTIFIER => {
×
84
                    commits.extend(once(deserialize_source_metadata(value.0, value.1)))
127✔
85
                }
86
                OUTPUT_SCHEMA_IDENTIFIER => {
×
87
                    let handle: PortHandle = PortHandle::from_be_bytes(
95✔
88
                        (&value.0[1..])
95✔
89
                            .try_into()
95✔
90
                            .map_err(|_e| ExecutionError::InvalidPortHandle(0))?,
95✔
91
                    );
×
92
                    let schema: Schema =
95✔
93
                        bincode::deserialize(value.1).map_err(|e| DeserializationError {
95✔
94
                            typ: "Schema".to_string(),
×
95
                            reason: Box::new(e),
×
96
                        })?;
95✔
97
                    output_schemas.insert(handle, schema);
95✔
98
                }
×
99
                INPUT_SCHEMA_IDENTIFIER => {
×
100
                    let handle: PortHandle = PortHandle::from_be_bytes(
91✔
101
                        (&value.0[1..])
91✔
102
                            .try_into()
91✔
103
                            .map_err(|_e| ExecutionError::InvalidPortHandle(0))?,
91✔
104
                    );
×
105
                    let schema: Schema =
91✔
106
                        bincode::deserialize(value.1).map_err(|e| DeserializationError {
91✔
107
                            typ: "Schema".to_string(),
×
108
                            reason: Box::new(e),
×
109
                        })?;
91✔
110
                    input_schemas.insert(handle, schema);
91✔
111
                }
112
                _ => {
×
113
                    return Err(ExecutionError::InternalDatabaseError(
×
114
                        StorageError::InvalidRecord,
×
115
                    ))
×
116
                }
×
117
            }
×
118
            if !cur.next()? {
313✔
119
                break;
129✔
120
            }
184✔
121
        }
×
122

×
123
        Ok(Some(DagMetadata {
129✔
124
            commits,
129✔
125
            input_schemas,
129✔
126
            output_schemas,
129✔
127
        }))
129✔
128
    }
1,535✔
129

×
130
    fn get_checkpoint_metadata(
341✔
131
        path: &Path,
341✔
132
        dag: &Dag<T>,
341✔
133
    ) -> Result<HashMap<NodeHandle, DagMetadata>, ExecutionError> {
341✔
134
        let mut all = HashMap::<NodeHandle, DagMetadata>::new();
341✔
135
        for node in dag.node_handles() {
1,535✔
136
            if let Some(metadata) = Self::get_node_checkpoint_metadata(path, node)? {
1,535✔
137
                all.insert(node.clone(), metadata);
129✔
138
            }
1,406✔
139
        }
×
140
        Ok(all)
341✔
141
    }
341✔
142

×
143
    fn get_dependency_tree_consistency(
279✔
144
        &self,
279✔
145
        root_node: &NodeHandle,
279✔
146
    ) -> Result<HashMap<Option<OpIdentifier>, Vec<NodeHandle>>, ExecutionError> {
279✔
147
        let metadata = Self::get_checkpoint_metadata(self.path, self.dag)?;
279✔
148

×
149
        let mut result = HashMap::new();
279✔
150

×
151
        for node_handle in self.dag.bfs(root_node) {
700✔
152
            let seq = metadata
700✔
153
                .get(node_handle)
700✔
154
                .and_then(|dag_meta_data| dag_meta_data.commits.get(root_node).copied());
700✔
155

700✔
156
            insert_vec_element(&mut result, seq, node_handle.clone());
700✔
157
        }
700✔
158

×
159
        Ok(result)
279✔
160
    }
279✔
161

×
162
    pub(crate) fn get_checkpoint_consistency(
67✔
163
        &self,
67✔
164
    ) -> Result<HashMap<NodeHandle, Consistency>, ExecutionError> {
67✔
165
        let mut r: HashMap<NodeHandle, Consistency> = HashMap::new();
67✔
166
        for node_handle in self.dag.node_handles() {
279✔
167
            let consistency = self.get_dependency_tree_consistency(node_handle)?;
279✔
168
            debug_assert!(!consistency.is_empty());
279✔
169
            if consistency.len() == 1 {
279✔
170
                r.insert(
277✔
171
                    node_handle.clone(),
277✔
172
                    Consistency::FullyConsistent(*consistency.iter().next().unwrap().0),
277✔
173
                );
277✔
174
            } else {
277✔
175
                r.insert(
2✔
176
                    node_handle.clone(),
2✔
177
                    Consistency::PartiallyConsistent(consistency),
2✔
178
                );
2✔
179
            }
2✔
180
        }
×
181
        Ok(r)
67✔
182
    }
67✔
183

×
184
    pub(crate) fn delete_metadata(&self) {
248✔
185
        for node in self.dag.node_handles() {
1,154✔
186
            LmdbEnvironmentManager::remove(self.path, &metadata_environment_name(node));
1,154✔
187
        }
1,154✔
188
    }
248✔
189

×
190
    pub(crate) fn get_metadata(&self) -> Result<HashMap<NodeHandle, DagMetadata>, ExecutionError> {
62✔
191
        Self::get_checkpoint_metadata(self.path, self.dag)
62✔
192
    }
62✔
193

×
194
    pub(crate) fn init_metadata(
248✔
195
        &self,
248✔
196
        schemas: &HashMap<NodeHandle, NodeSchemas<T>>,
248✔
197
    ) -> Result<(), ExecutionError> {
248✔
198
        for node in self.dag.node_handles() {
1,154✔
199
            let curr_node_schema = schemas
1,154✔
200
                .get(node)
1,154✔
201
                .ok_or_else(|| InvalidNodeHandle(node.clone()))?;
1,154✔
202

×
203
            let env_name = metadata_environment_name(node);
1,154✔
204
            if LmdbEnvironmentManager::exists(self.path, &env_name) {
1,154✔
205
                return Err(MetadataAlreadyExists(node.clone()));
×
206
            }
1,154✔
207

×
208
            let mut env = LmdbEnvironmentManager::create(self.path, &env_name)?;
1,154✔
209
            let db = env.open_database(METADATA_DB_NAME, false)?;
1,154✔
210
            let txn = env.create_txn()?;
1,154✔
211
            let mut txn = SharedTransaction::try_unwrap(txn)
1,154✔
212
                .expect("We just created this `SharedTransaction`. It's not shared.");
1,154✔
213

×
214
            for (handle, (schema, _ctx)) in curr_node_schema.output_schemas.iter() {
1,154✔
215
                let mut key: Vec<u8> = vec![OUTPUT_SCHEMA_IDENTIFIER];
910✔
216
                key.extend(handle.to_be_bytes());
910✔
217
                let value = bincode::serialize(schema).map_err(|e| SerializationError {
910✔
218
                    typ: "Schema".to_string(),
×
219
                    reason: Box::new(e),
×
220
                })?;
910✔
221
                txn.put(db, &key, &value)?;
910✔
222
            }
×
223

×
224
            for (handle, (schema, _ctx)) in curr_node_schema.input_schemas.iter() {
1,154✔
225
                let mut key: Vec<u8> = vec![INPUT_SCHEMA_IDENTIFIER];
955✔
226
                key.extend(handle.to_be_bytes());
955✔
227
                let value = bincode::serialize(schema).map_err(|e| SerializationError {
955✔
228
                    typ: "Schema".to_string(),
×
229
                    reason: Box::new(e),
×
230
                })?;
955✔
231
                txn.put(db, &key, &value)?;
955✔
232
            }
×
233

×
234
            txn.commit_and_renew()?;
1,154✔
235
        }
×
236
        Ok(())
248✔
237
    }
248✔
238
}
×
239

×
240
fn metadata_environment_name(node_handle: &NodeHandle) -> String {
8,323✔
241
    format!("{node_handle}")
8,323✔
242
}
8,323✔
243

244
pub fn write_source_metadata<'a>(
5,309✔
245
    txn: &mut LmdbExclusiveTransaction,
5,309✔
246
    db: Database,
5,309✔
247
    metadata: &'a mut impl Iterator<Item = (&'a NodeHandle, OpIdentifier)>,
5,309✔
248
) -> Result<(), StorageError> {
5,309✔
249
    for (source, op_id) in metadata {
11,514✔
250
        let (key, value) = serialize_source_metadata(source, op_id);
6,205✔
251

6,205✔
252
        txn.put(db, &key, &value)?;
6,205✔
253
    }
×
254
    Ok(())
5,309✔
255
}
5,309✔
256

×
257
fn serialize_source_metadata(node_handle: &NodeHandle, op_id: OpIdentifier) -> (Vec<u8>, Vec<u8>) {
6,208✔
258
    let mut key: Vec<u8> = vec![SOURCE_ID_IDENTIFIER];
6,208✔
259
    key.extend(node_handle.to_bytes());
6,208✔
260

6,208✔
261
    let mut value: Vec<u8> = Vec::with_capacity(16);
6,208✔
262
    value.extend(op_id.txid.to_be_bytes());
6,208✔
263
    value.extend(op_id.seq_in_tx.to_be_bytes());
6,208✔
264

6,208✔
265
    (key, value)
6,208✔
266
}
6,208✔
267

×
268
fn deserialize_source_metadata(key: &[u8], value: &[u8]) -> (NodeHandle, OpIdentifier) {
131✔
269
    debug_assert!(key[0] == SOURCE_ID_IDENTIFIER);
131✔
270
    let source = NodeHandle::from_bytes(&key[1..]);
129✔
271

129✔
272
    let txid = u64::from_be_bytes(value[0..8].try_into().unwrap());
129✔
273
    let seq_in_tx = u64::from_be_bytes(value[8..16].try_into().unwrap());
129✔
274
    (source, OpIdentifier { txid, seq_in_tx })
129✔
275
}
129✔
276

×
277
#[cfg(test)]
×
278
mod tests {
×
279
    use super::*;
×
280

×
281
    #[test]
1✔
282
    fn test_source_metadata_serialization() {
1✔
283
        fn check(node_handle: NodeHandle, op_id: OpIdentifier) {
1✔
284
            let (key, value) = serialize_source_metadata(&node_handle, op_id);
1✔
285
            let (node_handle2, op_id2) = deserialize_source_metadata(&key, &value);
1✔
286
            assert_eq!(node_handle2, node_handle);
1✔
287
            assert_eq!(op_id2, op_id);
1✔
288
        }
1✔
289

1✔
290
        check(
1✔
291
            NodeHandle::new(None, "node".to_string()),
1✔
292
            OpIdentifier::new(0, 0),
1✔
293
        );
1✔
294
    }
1✔
295

×
296
    #[test]
1✔
297
    #[should_panic]
×
298
    fn source_metadata_deserialization_panics_on_empty_key() {
1✔
299
        deserialize_source_metadata(&[], &[]);
1✔
300
    }
1✔
301

×
302
    #[test]
1✔
303
    #[should_panic]
×
304
    fn source_metadata_deserialization_panics_on_invalid_key() {
1✔
305
        let (mut key, _) = serialize_source_metadata(
1✔
306
            &NodeHandle::new(None, "node".to_string()),
1✔
307
            OpIdentifier::default(),
1✔
308
        );
1✔
309
        key[0] = 1;
1✔
310
        deserialize_source_metadata(&key, &[]);
1✔
311
    }
1✔
312

×
313
    #[test]
1✔
314
    #[should_panic]
×
315
    fn source_metadata_deserialization_panics_on_empty_value() {
1✔
316
        let (key, _) = serialize_source_metadata(
1✔
317
            &NodeHandle::new(None, "node".to_string()),
1✔
318
            OpIdentifier::default(),
1✔
319
        );
1✔
320
        deserialize_source_metadata(&key, &[]);
1✔
321
    }
1✔
322
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc