• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4116183752

pending completion
4116183752

push

github

GitHub
refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync` (#821)

790 of 790 new or added lines in 44 files covered. (100.0%)

23005 of 33842 relevant lines covered (67.98%)

56312.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.49
/dozer-cache/src/cache/lmdb/cache/schema_database.rs
1
use dozer_storage::{
2
    errors::StorageError,
3
    lmdb::{Cursor, Database, DatabaseFlags, RwTransaction, Transaction, WriteFlags},
4
    lmdb_storage::LmdbEnvironmentManager,
5
};
6
use dozer_types::{
7
    bincode,
8
    types::{IndexDefinition, Schema, SchemaIdentifier},
9
};
10

11
use crate::errors::{CacheError, QueryError};
12

×
13
#[derive(Debug, Clone, Copy)]
×
14
pub struct SchemaDatabase(Database);
15

16
impl SchemaDatabase {
×
17
    pub fn new(
98✔
18
        env: &mut LmdbEnvironmentManager,
98✔
19
        create_if_not_exist: bool,
98✔
20
    ) -> Result<Self, CacheError> {
98✔
21
        let flags = if create_if_not_exist {
98✔
22
            Some(DatabaseFlags::empty())
96✔
23
        } else {
×
24
            None
2✔
25
        };
×
26
        let db = env.create_database(Some("schemas"), flags)?;
98✔
27
        Ok(Self(db))
98✔
28
    }
98✔
29

×
30
    pub fn insert(
97✔
31
        &self,
97✔
32
        txn: &mut RwTransaction,
97✔
33
        schema_name: &str,
97✔
34
        schema: &Schema,
97✔
35
        secondary_indexes: &[IndexDefinition],
97✔
36
    ) -> Result<(), CacheError> {
97✔
37
        let encoded: Vec<u8> = bincode::serialize(&(schema, secondary_indexes))
97✔
38
            .map_err(CacheError::map_serialization_error)?;
97✔
39
        let schema_id = schema
97✔
40
            .identifier
97✔
41
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
97✔
42
        let key = get_schema_key(schema_id);
97✔
43

97✔
44
        // Insert Schema with {id, version}
97✔
45
        txn.put::<Vec<u8>, Vec<u8>>(self.0, &key, &encoded, WriteFlags::NO_OVERWRITE)
97✔
46
            .map_err(|e| CacheError::Query(QueryError::InsertValue(e)))?;
97✔
47

×
48
        let schema_id_bytes =
97✔
49
            bincode::serialize(&schema_id).map_err(CacheError::map_serialization_error)?;
97✔
50

51
        // Insert Reverse key lookup for schema by name
×
52
        let schema_key = get_schema_reverse_key(schema_name);
97✔
53

97✔
54
        txn.put::<Vec<u8>, Vec<u8>>(
97✔
55
            self.0,
97✔
56
            &schema_key,
97✔
57
            &schema_id_bytes,
97✔
58
            WriteFlags::NO_OVERWRITE,
97✔
59
        )
97✔
60
        .map_err(|e| CacheError::Query(QueryError::InsertValue(e)))?;
97✔
61

×
62
        Ok(())
97✔
63
    }
97✔
64

×
65
    pub fn get_schema_from_name<T: Transaction>(
570✔
66
        &self,
570✔
67
        txn: &T,
570✔
68
        name: &str,
570✔
69
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
570✔
70
        let schema_reverse_key = get_schema_reverse_key(name);
570✔
71
        let schema_identifier = txn
570✔
72
            .get(self.0, &schema_reverse_key)
570✔
73
            .map_err(|e| CacheError::Query(QueryError::GetValue(e)))?;
570✔
74
        let schema_id: SchemaIdentifier = bincode::deserialize(schema_identifier)
570✔
75
            .map_err(CacheError::map_deserialization_error)?;
570✔
76

×
77
        let schema = self.get_schema(txn, schema_id)?;
570✔
78

×
79
        Ok(schema)
570✔
80
    }
570✔
81

×
82
    pub fn get_schema<T: Transaction>(
10,118✔
83
        &self,
10,118✔
84
        txn: &T,
10,118✔
85
        identifier: SchemaIdentifier,
10,118✔
86
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
10,118✔
87
        let key = get_schema_key(identifier);
10,118✔
88
        let schema = txn
10,118✔
89
            .get(self.0, &key)
10,118✔
90
            .map_err(|e| CacheError::Query(QueryError::GetSchema(e)))?;
10,118✔
91
        let schema = bincode::deserialize(schema).map_err(CacheError::map_deserialization_error)?;
10,118✔
92
        Ok(schema)
10,118✔
93
    }
10,118✔
94

×
95
    pub fn get_all_schemas(
96✔
96
        &self,
96✔
97
        env: &mut LmdbEnvironmentManager,
96✔
98
    ) -> Result<Vec<(Schema, Vec<IndexDefinition>)>, CacheError> {
96✔
99
        let txn = env.begin_ro_txn()?;
96✔
100
        let schemas = get_all_schemas(&txn, self.0)?;
96✔
101
        txn.commit().map_err(StorageError::InternalDbError)?;
96✔
102
        Ok(schemas)
96✔
103
    }
96✔
104
}
105

×
106
fn get_schema_key(schema_id: SchemaIdentifier) -> Vec<u8> {
10,215✔
107
    [
10,215✔
108
        "sc".as_bytes(),
10,215✔
109
        schema_id.id.to_be_bytes().as_ref(),
10,215✔
110
        schema_id.version.to_be_bytes().as_ref(),
10,215✔
111
    ]
10,215✔
112
    .join("#".as_bytes())
10,215✔
113
}
10,215✔
114

×
115
fn get_all_schemas<T: Transaction>(
98✔
116
    txn: &T,
98✔
117
    db: Database,
98✔
118
) -> Result<Vec<(Schema, Vec<IndexDefinition>)>, CacheError> {
98✔
119
    let mut cursor = txn
98✔
120
        .open_ro_cursor(db)
98✔
121
        .map_err(|e| CacheError::Storage(StorageError::InternalDbError(e)))?;
98✔
122

×
123
    let mut result = vec![];
98✔
124
    for item in cursor.iter_start() {
98✔
125
        let (key, value) = item.map_err(QueryError::GetValue)?;
6✔
126
        if key.starts_with(b"sc#") {
6✔
127
            let schema: (Schema, Vec<IndexDefinition>) =
3✔
128
                bincode::deserialize(value).map_err(CacheError::map_deserialization_error)?;
3✔
129
            result.push(schema);
3✔
130
        }
3✔
131
    }
×
132
    Ok(result)
98✔
133
}
98✔
134
const SCHEMA_NAME_PREFIX: &str = "schema_name_";
135

136
fn get_schema_reverse_key(name: &str) -> Vec<u8> {
2,209✔
137
    format!("{SCHEMA_NAME_PREFIX}{name}").into_bytes()
2,209✔
138
}
2,209✔
139

140
#[cfg(test)]
141
mod tests {
×
142
    use dozer_types::types::{FieldDefinition, FieldType, SourceDefinition};
×
143

×
144
    use crate::cache::{lmdb::utils::init_env, CacheOptions};
×
145

×
146
    use super::*;
×
147

×
148
    #[test]
1✔
149
    fn test_schema_database() {
1✔
150
        let mut env = init_env(&CacheOptions::default()).unwrap();
1✔
151
        let writer = SchemaDatabase::new(&mut env, true).unwrap();
1✔
152
        let reader = SchemaDatabase::new(&mut env, false).unwrap();
1✔
153

1✔
154
        let schema_name = "test_schema";
1✔
155
        let schema = Schema {
1✔
156
            identifier: Some(SchemaIdentifier { id: 1, version: 1 }),
1✔
157
            fields: vec![FieldDefinition {
1✔
158
                name: "id".to_string(),
1✔
159
                typ: FieldType::UInt,
1✔
160
                nullable: false,
1✔
161
                source: SourceDefinition::Dynamic,
1✔
162
            }],
1✔
163
            primary_index: vec![0],
1✔
164
        };
1✔
165
        let secondary_indexes = vec![IndexDefinition::SortedInverted(vec![0])];
1✔
166

1✔
167
        let txn = env.create_txn().unwrap();
1✔
168
        let mut txn = txn.write();
1✔
169
        writer
1✔
170
            .insert(txn.txn_mut(), schema_name, &schema, &secondary_indexes)
1✔
171
            .unwrap();
1✔
172
        txn.commit_and_renew().unwrap();
1✔
173

1✔
174
        assert_eq!(
1✔
175
            writer.get_schema_from_name(txn.txn(), schema_name).unwrap(),
1✔
176
            (schema.clone(), secondary_indexes.clone())
1✔
177
        );
1✔
178
        assert_eq!(
1✔
179
            reader.get_schema_from_name(txn.txn(), schema_name).unwrap(),
1✔
180
            (schema.clone(), secondary_indexes.clone())
1✔
181
        );
1✔
182
        assert_eq!(
1✔
183
            writer
1✔
184
                .get_schema(txn.txn(), schema.identifier.unwrap())
1✔
185
                .unwrap(),
1✔
186
            (schema.clone(), secondary_indexes.clone())
1✔
187
        );
1✔
188
        assert_eq!(
1✔
189
            reader
1✔
190
                .get_schema(txn.txn(), schema.identifier.unwrap())
1✔
191
                .unwrap(),
1✔
192
            (schema.clone(), secondary_indexes.clone())
1✔
193
        );
1✔
194
        txn.commit_and_renew().unwrap();
1✔
195

1✔
196
        assert_eq!(
1✔
197
            get_all_schemas(txn.txn(), writer.0).unwrap(),
1✔
198
            vec![(schema.clone(), secondary_indexes.clone())]
1✔
199
        );
1✔
200
        assert_eq!(
1✔
201
            get_all_schemas(txn.txn(), reader.0).unwrap(),
1✔
202
            vec![(schema, secondary_indexes)]
1✔
203
        );
1✔
204
    }
1✔
205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc