• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4071639102

pending completion
4071639102

Pull #780

github

GitHub
Merge 6694befe5 into 3a0622c99
Pull Request #780: fix: Clear PK from Projection output

4 of 4 new or added lines in 1 file covered. (100.0%)

24332 of 35774 relevant lines covered (68.02%)

35765.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.58
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::collections::HashMap;
2
use std::sync::Arc;
3

4
use dozer_types::parking_lot::RwLock;
5
pub use lmdb;
6
use lmdb::{Environment, RoTransaction, RwTransaction, Transaction};
7

8
use dozer_types::types::{Field, FieldType, IndexDefinition, Record};
9
use dozer_types::types::{Schema, SchemaIdentifier};
10

11
use super::super::Cache;
12
use super::indexer::Indexer;
13
use super::query::handler::LmdbQueryHandler;
14
use super::{utils, CacheOptions, CacheOptionsKind};
15
use crate::cache::expression::QueryExpression;
16
use crate::cache::index::get_primary_key;
17
use crate::errors::CacheError;
18

19
mod id_database;
20
mod record_database;
21
mod schema_database;
22
mod secondary_index_database;
23

24
pub use id_database::IdDatabase;
25
pub use record_database::RecordDatabase;
26
use schema_database::SchemaDatabase;
27
use secondary_index_database::SecondaryIndexDatabase;
28

29
pub type SecondaryIndexDatabases = HashMap<(SchemaIdentifier, usize), SecondaryIndexDatabase>;
30

31
#[derive(Debug)]
×
32
pub struct LmdbCache {
33
    env: Environment,
34
    db: RecordDatabase,
35
    id: IdDatabase,
36
    secondary_indexes: Arc<RwLock<SecondaryIndexDatabases>>,
37
    schema_db: SchemaDatabase,
38
    cache_options: CacheOptions,
39
}
40

41
impl LmdbCache {
42
    pub fn begin_rw_txn(&self) -> Result<RwTransaction, CacheError> {
10✔
43
        self.env
10✔
44
            .begin_rw_txn()
10✔
45
            .map_err(|e| CacheError::InternalError(Box::new(e)))
10✔
46
    }
10✔
47
    pub fn new(cache_options: CacheOptions) -> Result<Self, CacheError> {
131✔
48
        // Create environment.
49
        let env = utils::init_env(&cache_options)?;
131✔
50

51
        // Create or open must have databases.
52
        let create_if_not_exist = matches!(cache_options.kind, CacheOptionsKind::Write(_));
131✔
53
        let db = RecordDatabase::new(&env, create_if_not_exist)?;
131✔
54
        let id = IdDatabase::new(&env, create_if_not_exist)?;
131✔
55
        let schema_db = SchemaDatabase::new(&env, create_if_not_exist)?;
131✔
56

57
        // Open existing secondary index databases.
58
        let mut secondary_indexe_databases = HashMap::default();
131✔
59
        let schemas = schema_db.get_all_schemas(&env)?;
131✔
60
        for (schema, secondary_indexes) in schemas {
132✔
61
            let schema_id = schema
1✔
62
                .identifier
1✔
63
                .ok_or(CacheError::SchemaIdentifierNotFound)?;
1✔
64
            for (index, index_definition) in secondary_indexes.iter().enumerate() {
4✔
65
                let db =
4✔
66
                    SecondaryIndexDatabase::new(&env, &schema, index, index_definition, false)?;
4✔
67
                secondary_indexe_databases.insert((schema_id, index), db);
4✔
68
            }
69
        }
70

71
        Ok(Self {
131✔
72
            env,
131✔
73
            db,
131✔
74
            id,
131✔
75
            secondary_indexes: Arc::new(RwLock::new(secondary_indexe_databases)),
131✔
76
            schema_db,
131✔
77
            cache_options,
131✔
78
        })
131✔
79
    }
131✔
80

81
    pub fn insert_with_txn(
10,761✔
82
        &self,
10,761✔
83
        txn: &mut RwTransaction,
10,761✔
84
        record: &Record,
10,761✔
85
        schema: &Schema,
10,761✔
86
        secondary_indexes: &[IndexDefinition],
10,761✔
87
    ) -> Result<(), CacheError> {
10,761✔
88
        debug_check_schema_record_consistency(schema, record);
10,761✔
89

90
        let id = if schema.primary_index.is_empty() {
10,761✔
91
            self.id.get_or_generate(txn, None)?
1✔
92
        } else {
93
            let primary_key = get_primary_key(&schema.primary_index, &record.values);
10,760✔
94
            self.id.get_or_generate(txn, Some(&primary_key))?
10,760✔
95
        };
96
        self.db.insert(txn, id, record)?;
10,761✔
97

98
        let indexer = Indexer {
10,761✔
99
            secondary_indexes: self.secondary_indexes.clone(),
10,761✔
100
        };
10,761✔
101

10,761✔
102
        indexer.build_indexes(txn, record, schema, secondary_indexes, id)?;
10,761✔
103

104
        Ok(())
10,761✔
105
    }
10,761✔
106

107
    pub fn get_schema_and_indexes_from_record<T: Transaction>(
10,756✔
108
        &self,
10,756✔
109
        txn: &T,
10,756✔
110
        record: &Record,
10,756✔
111
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
10,756✔
112
        let schema_identifier = record
10,756✔
113
            .schema_id
10,756✔
114
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
10,756✔
115
        self.schema_db.get_schema(txn, schema_identifier)
10,756✔
116
    }
10,756✔
117

118
    fn get_with_txn<T: Transaction>(&self, txn: &T, key: &[u8]) -> Result<Record, CacheError> {
119
        self.db.get(txn, self.id.get(txn, key)?)
32✔
120
    }
32✔
121

122
    pub fn delete_with_txn(
11✔
123
        &self,
11✔
124
        txn: &mut RwTransaction,
11✔
125
        key: &[u8],
11✔
126
        record: &Record,
11✔
127
        schema: &Schema,
11✔
128
        secondary_indexes: &[IndexDefinition],
11✔
129
    ) -> Result<(), CacheError> {
11✔
130
        debug_check_schema_record_consistency(schema, record);
11✔
131

132
        let id = self.id.get(txn, key)?;
11✔
133
        self.db.delete(txn, id)?;
11✔
134

135
        let indexer = Indexer {
11✔
136
            secondary_indexes: self.secondary_indexes.clone(),
11✔
137
        };
11✔
138
        indexer.delete_indexes(txn, record, schema, secondary_indexes, id)
11✔
139
    }
11✔
140

141
    pub fn update_with_txn(
6✔
142
        &self,
6✔
143
        txn: &mut RwTransaction,
6✔
144
        key: &[u8],
6✔
145
        old: &Record,
6✔
146
        new: &Record,
6✔
147
        schema: &Schema,
6✔
148
        secondary_indexes: &[IndexDefinition],
6✔
149
    ) -> Result<(), CacheError> {
6✔
150
        debug_check_schema_record_consistency(schema, old);
6✔
151
        debug_check_schema_record_consistency(schema, new);
6✔
152

6✔
153
        self.delete_with_txn(txn, key, old, schema, secondary_indexes)?;
6✔
154

155
        self.insert_with_txn(txn, new, schema, secondary_indexes)
6✔
156
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
6✔
157
        Ok(())
6✔
158
    }
6✔
159
}
160

161
impl Cache for LmdbCache {
162
    fn insert(&self, record: &Record) -> Result<(), CacheError> {
10,750✔
163
        let mut txn: RwTransaction = self
10,750✔
164
            .env
10,750✔
165
            .begin_rw_txn()
10,750✔
166
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
10,750✔
167
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(&txn, record)?;
10,750✔
168

169
        self.insert_with_txn(&mut txn, record, &schema, &secondary_indexes)?;
10,750✔
170
        txn.commit()
10,750✔
171
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
10,750✔
172
        Ok(())
10,750✔
173
    }
10,750✔
174

175
    fn delete(&self, key: &[u8]) -> Result<(), CacheError> {
5✔
176
        let mut txn: RwTransaction = self
5✔
177
            .env
5✔
178
            .begin_rw_txn()
5✔
179
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
5✔
180

181
        let record = self.get_with_txn(&txn, key)?;
5✔
182
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(&txn, &record)?;
5✔
183
        self.delete_with_txn(&mut txn, key, &record, &schema, &secondary_indexes)?;
5✔
184

185
        txn.commit()
5✔
186
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
5✔
187
        Ok(())
5✔
188
    }
5✔
189

190
    fn get(&self, key: &[u8]) -> Result<Record, CacheError> {
26✔
191
        let txn: RoTransaction = self
26✔
192
            .env
26✔
193
            .begin_ro_txn()
26✔
194
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
26✔
195
        self.get_with_txn(&txn, key)
26✔
196
    }
26✔
197

198
    fn count(&self, schema_name: &str, query: &QueryExpression) -> Result<usize, CacheError> {
1,244✔
199
        let txn = self
1,244✔
200
            .env
1,244✔
201
            .begin_ro_txn()
1,244✔
202
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
1,244✔
203
        let (schema, secondary_indexes) = self.schema_db.get_schema_from_name(&txn, schema_name)?;
1,244✔
204

205
        let handler = LmdbQueryHandler::new(
1,244✔
206
            self.db,
1,244✔
207
            self.secondary_indexes.clone(),
1,244✔
208
            &txn,
1,244✔
209
            &schema,
1,244✔
210
            &secondary_indexes,
1,244✔
211
            query,
1,244✔
212
            self.cache_options.common.intersection_chunk_size,
1,244✔
213
        );
1,244✔
214
        handler.count()
1,244✔
215
    }
1,244✔
216

217
    fn query(&self, schema_name: &str, query: &QueryExpression) -> Result<Vec<Record>, CacheError> {
1,264✔
218
        let txn: RoTransaction = self
1,264✔
219
            .env
1,264✔
220
            .begin_ro_txn()
1,264✔
221
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
1,264✔
222
        let (schema, secondary_indexes) = self.schema_db.get_schema_from_name(&txn, schema_name)?;
1,264✔
223

224
        let handler = LmdbQueryHandler::new(
1,264✔
225
            self.db,
1,264✔
226
            self.secondary_indexes.clone(),
1,264✔
227
            &txn,
1,264✔
228
            &schema,
1,264✔
229
            &secondary_indexes,
1,264✔
230
            query,
1,264✔
231
            self.cache_options.common.intersection_chunk_size,
1,264✔
232
        );
1,264✔
233
        let records = handler.query()?;
1,264✔
234
        Ok(records)
1,263✔
235
    }
1,264✔
236

237
    fn update(&self, key: &[u8], record: &Record) -> Result<(), CacheError> {
1✔
238
        let mut txn: RwTransaction = self
1✔
239
            .env
1✔
240
            .begin_rw_txn()
1✔
241
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
1✔
242
        let old_record = self.get_with_txn(&txn, key)?;
1✔
243
        let (schema, secondary_indexes) =
1✔
244
            self.get_schema_and_indexes_from_record(&txn, &old_record)?;
1✔
245
        self.update_with_txn(
1✔
246
            &mut txn,
1✔
247
            key,
1✔
248
            &old_record,
1✔
249
            record,
1✔
250
            &schema,
1✔
251
            &secondary_indexes,
1✔
252
        )?;
1✔
253
        txn.commit()
1✔
254
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
1✔
255
        Ok(())
1✔
256
    }
1✔
257

258
    fn get_schema_and_indexes_by_name(
131✔
259
        &self,
131✔
260
        name: &str,
131✔
261
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
131✔
262
        let txn: RoTransaction = self
131✔
263
            .env
131✔
264
            .begin_ro_txn()
131✔
265
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
131✔
266
        let schema = self.schema_db.get_schema_from_name(&txn, name)?;
131✔
267
        Ok(schema)
131✔
268
    }
131✔
269

270
    fn get_schema(&self, schema_identifier: &SchemaIdentifier) -> Result<Schema, CacheError> {
1✔
271
        let txn: RoTransaction = self
1✔
272
            .env
1✔
273
            .begin_ro_txn()
1✔
274
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
1✔
275
        self.schema_db
1✔
276
            .get_schema(&txn, *schema_identifier)
1✔
277
            .map(|(schema, _)| schema)
1✔
278
    }
1✔
279
    fn insert_schema(
131✔
280
        &self,
131✔
281
        name: &str,
131✔
282
        schema: &Schema,
131✔
283
        secondary_indexes: &[IndexDefinition],
131✔
284
    ) -> Result<(), CacheError> {
131✔
285
        let schema_id = schema
131✔
286
            .identifier
131✔
287
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
131✔
288

289
        // Create a db for each index
290
        for (idx, index) in secondary_indexes.iter().enumerate() {
630✔
291
            let db = SecondaryIndexDatabase::new(&self.env, schema, idx, index, true)?;
630✔
292
            self.secondary_indexes.write().insert((schema_id, idx), db);
630✔
293
        }
294

295
        let mut txn: RwTransaction = self
131✔
296
            .env
131✔
297
            .begin_rw_txn()
131✔
298
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
131✔
299
        self.schema_db
131✔
300
            .insert(&mut txn, name, schema, secondary_indexes)?;
131✔
301
        txn.commit()
131✔
302
            .map_err(|e| CacheError::InternalError(Box::new(e)))?;
131✔
303
        Ok(())
131✔
304
    }
131✔
305
}
306

307
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
10,784✔
308
    debug_assert_eq!(schema.identifier, record.schema_id);
10,784✔
309
    debug_assert_eq!(schema.fields.len(), record.values.len());
10,784✔
310
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
93,739✔
311
        if field.nullable && value == &Field::Null {
93,739✔
312
            continue;
16,453✔
313
        }
77,286✔
314
        match field.typ {
77,286✔
315
            FieldType::UInt => debug_assert!(value.as_uint().is_some()),
36,440✔
316
            FieldType::Int => debug_assert!(value.as_int().is_some()),
67✔
317
            FieldType::Float => debug_assert!(value.as_float().is_some()),
10,000✔
318
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
319
            FieldType::String => debug_assert!(value.as_string().is_some()),
25,778✔
320
            FieldType::Text => debug_assert!(value.as_text().is_some()),
1✔
321
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
322
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
323
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
5,000✔
324
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
325
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
326
        }
327
    }
328
}
10,784✔
329

330
/// Methods for testing.
331
#[cfg(test)]
332
mod tests {
333
    use super::*;
334

335
    impl LmdbCache {
336
        pub fn get_env_and_secondary_indexes(
2✔
337
            &self,
2✔
338
        ) -> (&Environment, &RwLock<SecondaryIndexDatabases>) {
2✔
339
            (&self.env, &self.secondary_indexes)
2✔
340
        }
2✔
341
    }
342
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc