• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4354627675

pending completion
4354627675

push

github

GitHub
chore: Use `LmdbMap` and `LmdbMultimap` instead of raw database in cache (#1156)

754 of 754 new or added lines in 15 files covered. (100.0%)

29895 of 39630 relevant lines covered (75.44%)

38604.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.12
/dozer-cache/src/cache/lmdb/cache/schema_database.rs
1
use std::{borrow::Cow, collections::HashMap};
2

3
use dozer_storage::{lmdb::RwTransaction, lmdb_storage::LmdbEnvironmentManager, LmdbMap};
4
use dozer_types::types::{IndexDefinition, Schema, SchemaIdentifier};
5

6
use crate::errors::CacheError;
7

8
#[derive(Debug, Clone)]
×
9
pub struct SchemaDatabase {
10
    database: LmdbMap<str, (Schema, Vec<IndexDefinition>)>,
11
    schemas: Vec<(Schema, Vec<IndexDefinition>)>,
12
    schema_name_to_index: HashMap<String, usize>,
13
    schema_id_to_index: HashMap<SchemaIdentifier, usize>,
14
}
15

×
16
impl SchemaDatabase {
17
    pub fn new(
288✔
18
        env: &mut LmdbEnvironmentManager,
288✔
19
        create_if_not_exist: bool,
288✔
20
    ) -> Result<Self, CacheError> {
288✔
21
        let database = LmdbMap::new_from_env(env, Some("schemas"), create_if_not_exist)?;
288✔
22

23
        // Collect existing schemas.
24
        let txn = env.begin_ro_txn()?;
288✔
25
        let mut schema_name_to_index = HashMap::new();
288✔
26
        let mut schema_id_to_index = HashMap::new();
288✔
27
        let schemas = database
288✔
28
            .iter(&txn)?
288✔
29
            .enumerate()
288✔
30
            .map(|(index, result)| {
288✔
31
                result.map_err(CacheError::Storage).and_then(
134✔
32
                    |(name, schema_and_indexes): (
134✔
33
                        Cow<str>,
34
                        Cow<(Schema, Vec<IndexDefinition>)>,
×
35
                    )| {
134✔
36
                        let (schema, indexes) = schema_and_indexes.into_owned();
134✔
37
                        schema_name_to_index.insert(name.into_owned(), index);
134✔
38
                        let identifier =
134✔
39
                            schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
134✔
40
                        if schema_id_to_index.insert(identifier, index).is_some() {
134✔
41
                            return Err(CacheError::DuplicateSchemaIdentifier(identifier));
×
42
                        }
134✔
43
                        Ok((schema, indexes))
134✔
44
                    },
134✔
45
                )
134✔
46
            })
288✔
47
            .collect::<Result<_, _>>()?;
288✔
48

×
49
        Ok(Self {
288✔
50
            database,
288✔
51
            schemas,
288✔
52
            schema_name_to_index,
288✔
53
            schema_id_to_index,
288✔
54
        })
288✔
55
    }
288✔
56

×
57
    pub fn insert(
145✔
58
        &mut self,
145✔
59
        txn: &mut RwTransaction,
145✔
60
        schema_name: String,
145✔
61
        schema: Schema,
145✔
62
        secondary_indexes: Vec<IndexDefinition>,
145✔
63
    ) -> Result<(), CacheError> {
145✔
64
        let identifier = schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
145✔
65

×
66
        let schema_and_indexes = (schema, secondary_indexes);
145✔
67
        if !self
145✔
68
            .database
145✔
69
            .insert(txn, &schema_name, &schema_and_indexes)?
145✔
70
        {
×
71
            panic!("Schema {schema_name} already exists");
×
72
        }
145✔
73

145✔
74
        let index = self.schemas.len();
145✔
75
        self.schemas.push(schema_and_indexes);
145✔
76
        self.schema_name_to_index.insert(schema_name, index);
145✔
77
        if self.schema_id_to_index.insert(identifier, index).is_some() {
145✔
78
            return Err(CacheError::DuplicateSchemaIdentifier(identifier));
×
79
        }
145✔
80

145✔
81
        Ok(())
145✔
82
    }
145✔
83

×
84
    pub fn get_schema_from_name(&self, name: &str) -> Option<&(Schema, Vec<IndexDefinition>)> {
4,518✔
85
        self.schema_name_to_index
4,518✔
86
            .get(name)
4,518✔
87
            .map(|index| &self.schemas[*index])
4,518✔
88
    }
4,518✔
89

×
90
    pub fn get_schema(
11,855✔
91
        &self,
11,855✔
92
        identifier: SchemaIdentifier,
11,855✔
93
    ) -> Option<&(Schema, Vec<IndexDefinition>)> {
11,855✔
94
        self.schema_id_to_index
11,855✔
95
            .get(&identifier)
11,855✔
96
            .map(|index| &self.schemas[*index])
11,897✔
97
    }
11,855✔
98

×
99
    pub fn get_all_schemas(&self) -> &[(Schema, Vec<IndexDefinition>)] {
286✔
100
        &self.schemas
286✔
101
    }
286✔
102
}
×
103

×
104
#[cfg(test)]
105
mod tests {
106
    use dozer_storage::{errors::StorageError, lmdb::Transaction};
107
    use dozer_types::types::{FieldDefinition, FieldType, SourceDefinition};
×
108

×
109
    use crate::cache::lmdb::utils::{init_env, CacheOptions};
×
110

×
111
    use super::*;
×
112

×
113
    fn get_all_schemas<T: Transaction>(
×
114
        txn: &T,
115
        map: LmdbMap<str, (Schema, Vec<IndexDefinition>)>,
×
116
    ) -> Result<Vec<(String, Schema, Vec<IndexDefinition>)>, StorageError> {
×
117
        map.iter(txn)?
2✔
118
            .map(|result| {
2✔
119
                result.and_then(
2✔
120
                    |(name, schema_and_indexes): (
2✔
121
                        Cow<str>,
×
122
                        Cow<(Schema, Vec<IndexDefinition>)>,
123
                    )| {
2✔
124
                        let (schema, indexes) = schema_and_indexes.into_owned();
2✔
125
                        Ok((name.into_owned(), schema, indexes))
2✔
126
                    },
2✔
127
                )
2✔
128
            })
2✔
129
            .collect()
2✔
130
    }
2✔
131

132
    #[test]
1✔
133
    fn test_schema_database() {
1✔
134
        let mut env = init_env(&CacheOptions::default()).unwrap().0;
1✔
135
        let mut writer = SchemaDatabase::new(&mut env, true).unwrap();
1✔
136

1✔
137
        let schema_name = "test_schema";
1✔
138
        let schema = Schema {
1✔
139
            identifier: Some(SchemaIdentifier { id: 1, version: 1 }),
1✔
140
            fields: vec![FieldDefinition {
1✔
141
                name: "id".to_string(),
1✔
142
                typ: FieldType::UInt,
1✔
143
                nullable: false,
1✔
144
                source: SourceDefinition::Dynamic,
1✔
145
            }],
1✔
146
            primary_index: vec![0],
1✔
147
        };
1✔
148
        let secondary_indexes = vec![IndexDefinition::SortedInverted(vec![0])];
1✔
149

1✔
150
        let mut txn = env.begin_rw_txn().unwrap();
1✔
151
        writer
1✔
152
            .insert(
1✔
153
                &mut txn,
1✔
154
                schema_name.to_string(),
1✔
155
                schema.clone(),
1✔
156
                secondary_indexes.clone(),
1✔
157
            )
1✔
158
            .unwrap();
1✔
159
        txn.commit().unwrap();
1✔
160

1✔
161
        let reader = SchemaDatabase::new(&mut env, false).unwrap();
1✔
162
        let txn = env.create_txn().unwrap();
1✔
163
        let mut txn = txn.write();
1✔
164

1✔
165
        let expected = (schema, secondary_indexes);
1✔
166
        assert_eq!(writer.get_schema_from_name(schema_name).unwrap(), &expected);
1✔
167
        assert_eq!(reader.get_schema_from_name(schema_name).unwrap(), &expected);
1✔
168
        assert_eq!(
1✔
169
            writer.get_schema(expected.0.identifier.unwrap()).unwrap(),
1✔
170
            &expected
1✔
171
        );
1✔
172
        assert_eq!(
1✔
173
            reader.get_schema(expected.0.identifier.unwrap()).unwrap(),
1✔
174
            &expected
1✔
175
        );
1✔
176
        txn.commit_and_renew().unwrap();
1✔
177

1✔
178
        let (schema, secondary_indexes) = expected;
1✔
179
        assert_eq!(
1✔
180
            get_all_schemas(txn.txn(), writer.database).unwrap(),
1✔
181
            vec![(
1✔
182
                schema_name.to_string(),
1✔
183
                schema.clone(),
1✔
184
                secondary_indexes.clone()
1✔
185
            )]
1✔
186
        );
1✔
187
        assert_eq!(
1✔
188
            get_all_schemas(txn.txn(), reader.database).unwrap(),
1✔
189
            vec![(schema_name.to_string(), schema, secondary_indexes)]
1✔
190
        );
1✔
191
    }
1✔
192
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc