• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4354627675

pending completion
4354627675

push

github

GitHub
chore: Use `LmdbMap` and `LmdbMultimap` instead of raw database in cache (#1156)

754 of 754 new or added lines in 15 files covered. (100.0%)

29895 of 39630 relevant lines covered (75.44%)

38604.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.98
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::collections::HashMap;
2
use std::fmt::Debug;
3
use std::path::PathBuf;
4

5
use dozer_storage::lmdb::{RoTransaction, RwTransaction, Transaction};
6
use dozer_storage::lmdb_storage::{
7
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
8
};
9
use dozer_storage::{LmdbMap, LmdbMultimap};
10

11
use dozer_types::node::{NodeHandle, OpIdentifier, SourceStates};
12
use dozer_types::parking_lot::RwLockReadGuard;
13

14
use dozer_types::types::{Field, FieldType, IndexDefinition, Record};
15
use dozer_types::types::{Schema, SchemaIdentifier};
16

17
use self::id_database::get_or_generate_id;
18
use self::secondary_index_database::{
19
    new_secondary_index_database_from_env, new_secondary_index_database_from_txn,
20
};
21

22
use super::super::{RoCache, RwCache};
23
use super::indexer::Indexer;
24
use super::utils::{self, CacheReadOptions};
25
use super::utils::{CacheOptions, CacheOptionsKind};
26
use crate::cache::expression::QueryExpression;
27
use crate::cache::index::get_primary_key;
28
use crate::cache::RecordWithId;
29
use crate::errors::CacheError;
30
use query::LmdbQueryHandler;
31

32
mod helper;
33
mod id_database;
34
mod query;
35
mod schema_database;
36
mod secondary_index_database;
37

38
use schema_database::SchemaDatabase;
39

40
pub type SecondaryIndexDatabases = HashMap<(SchemaIdentifier, usize), LmdbMultimap<[u8], u64>>;
41

42
#[derive(Clone, Debug)]
280✔
43
pub struct CacheCommonOptions {
44
    // Total number of readers allowed
45
    pub max_readers: u32,
46
    // Max no of dbs
47
    pub max_db_size: u32,
48

49
    /// The chunk size when calculating intersection of index queries.
50
    pub intersection_chunk_size: usize,
51

52
    /// Provide a path where db will be created. If nothing is provided, will default to a temp location.
53
    /// Db path will be `PathBuf.join(String)`.
54
    pub path: Option<(PathBuf, String)>,
55
}
56

57
impl Default for CacheCommonOptions {
58
    fn default() -> Self {
156✔
59
        Self {
156✔
60
            max_readers: 1000,
156✔
61
            max_db_size: 1000,
156✔
62
            intersection_chunk_size: 100,
156✔
63
            path: None,
156✔
64
        }
156✔
65
    }
156✔
66
}
67

68
#[derive(Debug)]
×
69
pub struct LmdbRoCache {
70
    common: LmdbCacheCommon,
71
    env: LmdbEnvironmentManager,
72
}
73

74
impl LmdbRoCache {
75
    pub fn new(options: CacheCommonOptions) -> Result<Self, CacheError> {
137✔
76
        let (mut env, name) = utils::init_env(&CacheOptions {
137✔
77
            common: options.clone(),
137✔
78
            kind: CacheOptionsKind::ReadOnly(CacheReadOptions {}),
137✔
79
        })?;
137✔
80
        let common = LmdbCacheCommon::new(&mut env, options, name, false)?;
137✔
81
        Ok(Self { common, env })
137✔
82
    }
137✔
83
}
84

85
#[derive(Clone, Debug)]
×
86
pub struct CacheWriteOptions {
87
    // Total size allocated for data in a memory mapped file.
88
    // This size is allocated at initialization.
89
    pub max_size: usize,
90
}
91

92
impl Default for CacheWriteOptions {
93
    fn default() -> Self {
161✔
94
        Self {
161✔
95
            max_size: 1024 * 1024 * 1024 * 1024,
161✔
96
        }
161✔
97
    }
161✔
98
}
99

100
#[derive(Debug)]
×
101
pub struct LmdbRwCache {
102
    common: LmdbCacheCommon,
103
    checkpoint_db: LmdbMap<NodeHandle, OpIdentifier>,
104
    txn: SharedTransaction,
105
}
106

107
impl LmdbRwCache {
108
    pub fn create(
139✔
109
        schemas: impl IntoIterator<Item = (String, Schema, Vec<IndexDefinition>)>,
139✔
110
        common_options: CacheCommonOptions,
139✔
111
        write_options: CacheWriteOptions,
139✔
112
    ) -> Result<Self, CacheError> {
139✔
113
        let mut cache = Self::open(common_options, write_options)?;
139✔
114

115
        let mut txn = cache.txn.write();
139✔
116
        for (schema_name, schema, secondary_indexes) in schemas {
283✔
117
            cache
138✔
118
                .common
138✔
119
                .insert_schema(&mut txn, schema_name, schema, secondary_indexes)?;
138✔
120
        }
121

122
        txn.commit_and_renew()?;
145✔
123
        drop(txn);
145✔
124

145✔
125
        Ok(cache)
145✔
126
    }
145✔
127

128
    pub fn open(
143✔
129
        common_options: CacheCommonOptions,
143✔
130
        write_options: CacheWriteOptions,
143✔
131
    ) -> Result<Self, CacheError> {
143✔
132
        let (mut env, name) = utils::init_env(&CacheOptions {
143✔
133
            common: common_options.clone(),
143✔
134
            kind: CacheOptionsKind::Write(write_options),
143✔
135
        })?;
143✔
136
        let common = LmdbCacheCommon::new(&mut env, common_options, name, true)?;
143✔
137
        let checkpoint_db = LmdbMap::new_from_env(&mut env, Some("checkpoint"), true)?;
143✔
138
        let txn = env.create_txn()?;
149✔
139
        Ok(Self {
149✔
140
            common,
149✔
141
            checkpoint_db,
149✔
142
            txn,
149✔
143
        })
149✔
144
    }
149✔
145
}
146

147
impl<C: LmdbCache> RoCache for C {
148
    fn name(&self) -> &str {
154✔
149
        &self.common().name
154✔
150
    }
154✔
151

152
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
43✔
153
        let txn = self.begin_txn()?;
43✔
154
        let txn = txn.as_txn();
43✔
155
        let id = self
43✔
156
            .common()
43✔
157
            .primary_key_to_record_id
43✔
158
            .get(txn, key)?
43✔
159
            .ok_or(CacheError::PrimaryKeyNotFound)?
43✔
160
            .into_owned();
43✔
161
        let record = self
43✔
162
            .common()
43✔
163
            .record_id_to_record
43✔
164
            .get(txn, &id)?
43✔
165
            .ok_or(CacheError::PrimaryKeyNotFound)?
43✔
166
            .into_owned();
36✔
167
        Ok(RecordWithId::new(id, record))
36✔
168
    }
43✔
169

×
170
    fn count(&self, schema_name: &str, query: &QueryExpression) -> Result<usize, CacheError> {
2,223✔
171
        let txn = self.begin_txn()?;
2,223✔
172
        let txn = txn.as_txn();
2,223✔
173
        let (schema, secondary_indexes) = self
2,223✔
174
            .common()
2,223✔
175
            .schema_db
2,223✔
176
            .get_schema_from_name(schema_name)
2,223✔
177
            .ok_or_else(|| CacheError::SchemaNotFound(schema_name.to_string()))?;
2,223✔
178
        let handler = LmdbQueryHandler::new(self.common(), txn, schema, secondary_indexes, query);
2,223✔
179
        handler.count()
2,223✔
180
    }
2,223✔
181

×
182
    fn query(
2,238✔
183
        &self,
2,238✔
184
        schema_name: &str,
2,238✔
185
        query: &QueryExpression,
2,238✔
186
    ) -> Result<(&Schema, Vec<RecordWithId>), CacheError> {
2,238✔
187
        let txn = self.begin_txn()?;
2,238✔
188
        let txn = txn.as_txn();
2,238✔
189
        let (schema, secondary_indexes) = self
2,238✔
190
            .common()
2,238✔
191
            .schema_db
2,238✔
192
            .get_schema_from_name(schema_name)
2,238✔
193
            .ok_or_else(|| CacheError::SchemaNotFound(schema_name.to_string()))?;
2,238✔
194
        let handler = LmdbQueryHandler::new(self.common(), txn, schema, secondary_indexes, query);
2,238✔
195
        let records = handler.query()?;
2,238✔
196
        Ok((schema, records))
2,237✔
197
    }
2,238✔
198

×
199
    fn get_schema_and_indexes_by_name(
55✔
200
        &self,
55✔
201
        name: &str,
55✔
202
    ) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
55✔
203
        let schema = self
55✔
204
            .common()
55✔
205
            .schema_db
55✔
206
            .get_schema_from_name(name)
55✔
207
            .ok_or_else(|| CacheError::SchemaNotFound(name.to_string()))?;
55✔
208
        Ok(schema)
55✔
209
    }
55✔
210

211
    fn get_schema(&self, schema_identifier: SchemaIdentifier) -> Result<&Schema, CacheError> {
1✔
212
        self.common()
1✔
213
            .schema_db
1✔
214
            .get_schema(schema_identifier)
1✔
215
            .map(|(schema, _)| schema)
1✔
216
            .ok_or(CacheError::SchemaIdentifierNotFound(schema_identifier))
1✔
217
    }
1✔
218
}
×
219

×
220
impl RwCache for LmdbRwCache {
×
221
    fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
11,845✔
222
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(record)?;
11,845✔
223
        record.version = Some(INITIAL_RECORD_VERSION);
11,845✔
224
        self.insert_impl(record, schema, secondary_indexes)
11,845✔
225
    }
11,845✔
226

×
227
    fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
6✔
228
        let (_, _, version) = self.delete_impl(key)?;
6✔
229
        Ok(version)
6✔
230
    }
6✔
231

×
232
    fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
7✔
233
        let (schema, secondary_indexes, old_version) = self.delete_impl(key)?;
7✔
234
        record.version = Some(old_version + 1);
7✔
235
        self.insert_impl(record, schema, secondary_indexes)?;
7✔
236
        Ok(old_version)
7✔
237
    }
7✔
238

×
239
    fn commit(&self, checkpoint: &SourceStates) -> Result<(), CacheError> {
139✔
240
        let mut txn = self.txn.write();
139✔
241
        self.checkpoint_db.clear(txn.txn_mut())?;
139✔
242
        self.checkpoint_db.extend(txn.txn_mut(), checkpoint)?;
139✔
243
        txn.commit_and_renew()?;
139✔
244
        Ok(())
139✔
245
    }
139✔
246

247
    fn get_checkpoint(&self) -> Result<SourceStates, CacheError> {
×
248
        let txn = self.txn.read();
×
249
        let result = self
×
250
            .checkpoint_db
×
251
            .iter(txn.txn())?
×
252
            .map(|result| {
×
253
                result
×
254
                    .map(|(key, value)| (key.into_owned(), value.into_owned()))
×
255
                    .map_err(CacheError::Storage)
×
256
            })
×
257
            .collect();
×
258
        result
×
259
    }
×
260
}
×
261

×
262
impl LmdbRwCache {
263
    fn delete_impl(&self, key: &[u8]) -> Result<(&Schema, &[IndexDefinition], u32), CacheError> {
13✔
264
        let record = self.get(key)?;
13✔
265
        let (schema, secondary_indexes) =
13✔
266
            self.get_schema_and_indexes_from_record(&record.record)?;
13✔
267

×
268
        let mut txn = self.txn.write();
13✔
269
        let txn = txn.txn_mut();
13✔
270

13✔
271
        if !self.common.record_id_to_record.remove(txn, &record.id)? {
13✔
272
            panic!("We just got this key from the map");
×
273
        }
13✔
274

13✔
275
        let indexer = Indexer {
13✔
276
            secondary_indexes: &self.common.secondary_indexes,
13✔
277
        };
13✔
278
        indexer.delete_indexes(txn, &record.record, schema, secondary_indexes, record.id)?;
13✔
279
        let version = record
13✔
280
            .record
13✔
281
            .version
13✔
282
            .expect("All records in cache should have a version");
13✔
283
        Ok((schema, secondary_indexes, version))
13✔
284
    }
13✔
285

286
    fn insert_impl(
11,882✔
287
        &self,
11,882✔
288
        record: &Record,
11,882✔
289
        schema: &Schema,
11,882✔
290
        secondary_indexes: &[IndexDefinition],
11,882✔
291
    ) -> Result<u64, CacheError> {
11,882✔
292
        let mut txn = self.txn.write();
11,882✔
293
        let txn = txn.txn_mut();
11,882✔
294

×
295
        let id = if schema.primary_index.is_empty() {
11,882✔
296
            get_or_generate_id(self.common.primary_key_to_record_id, txn, None)?
1✔
297
        } else {
298
            let primary_key = get_primary_key(&schema.primary_index, &record.values);
11,881✔
299
            get_or_generate_id(
11,881✔
300
                self.common.primary_key_to_record_id,
11,881✔
301
                txn,
11,881✔
302
                Some(&primary_key),
11,881✔
303
            )?
11,881✔
304
        };
305
        if !self.common.record_id_to_record.insert(txn, &id, record)? {
11,882✔
306
            return Err(CacheError::PrimaryKeyExists);
×
307
        }
11,864✔
308

11,864✔
309
        let indexer = Indexer {
11,864✔
310
            secondary_indexes: &self.common.secondary_indexes,
11,864✔
311
        };
11,864✔
312

11,864✔
313
        indexer.build_indexes(txn, record, schema, secondary_indexes, id)?;
11,864✔
314

315
        Ok(id)
11,852✔
316
    }
11,852✔
317
}
318

×
319
/// This trait abstracts the behavior of getting a transaction from a `LmdbExclusiveTransaction` or a `lmdb::Transaction`.
×
320
trait AsTransaction {
×
321
    type Transaction<'a>: Transaction
322
    where
323
        Self: 'a;
324

325
    fn as_txn(&self) -> &Self::Transaction<'_>;
326
}
327

328
impl<'a> AsTransaction for RoTransaction<'a> {
329
    type Transaction<'env> = RoTransaction<'env> where Self: 'env;
330

331
    fn as_txn(&self) -> &Self::Transaction<'_> {
4,421✔
332
        self
4,421✔
333
    }
4,421✔
334
}
×
335

×
336
impl<'a> AsTransaction for RwLockReadGuard<'a, LmdbExclusiveTransaction> {
×
337
    type Transaction<'env> = RwTransaction<'env> where Self: 'env;
×
338

×
339
    fn as_txn(&self) -> &Self::Transaction<'_> {
83✔
340
        self.txn()
83✔
341
    }
83✔
342
}
×
343

344
/// This trait abstracts the behavior of locking a `SharedTransaction` for reading
×
345
/// and beginning a `RoTransaction` from `LmdbEnvironmentManager`.
×
346
trait LmdbCache: Send + Sync + Debug {
×
347
    type AsTransaction<'a>: AsTransaction
×
348
    where
349
        Self: 'a;
350

351
    fn common(&self) -> &LmdbCacheCommon;
352
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError>;
353

×
354
    fn get_schema_and_indexes_from_record(
11,858✔
355
        &self,
11,858✔
356
        record: &Record,
11,858✔
357
    ) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
11,858✔
358
        let schema_identifier = record.schema_id.ok_or(CacheError::SchemaHasNoIdentifier)?;
11,858✔
359
        let schema = self
11,894✔
360
            .common()
11,858✔
361
            .schema_db
11,858✔
362
            .get_schema(schema_identifier)
11,858✔
363
            .ok_or(CacheError::SchemaIdentifierNotFound(schema_identifier))?;
11,858✔
364

365
        debug_check_schema_record_consistency(&schema.0, record);
11,894✔
366

11,894✔
367
        Ok(schema)
11,894✔
368
    }
11,894✔
369
}
×
370

×
371
impl LmdbCache for LmdbRoCache {
×
372
    type AsTransaction<'a> = RoTransaction<'a>;
373

374
    fn common(&self) -> &LmdbCacheCommon {
8,888✔
375
        &self.common
8,888✔
376
    }
8,888✔
377

×
378
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
4,421✔
379
        Ok(self.env.begin_ro_txn()?)
4,421✔
380
    }
4,421✔
381
}
×
382

383
impl LmdbCache for LmdbRwCache {
×
384
    type AsTransaction<'a> = RwLockReadGuard<'a, LmdbExclusiveTransaction>;
385

386
    fn common(&self) -> &LmdbCacheCommon {
12,199✔
387
        &self.common
12,199✔
388
    }
12,199✔
389

×
390
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
83✔
391
        Ok(self.txn.read())
83✔
392
    }
83✔
393
}
×
394

×
395
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
11,888✔
396
    debug_assert_eq!(schema.identifier, record.schema_id);
11,888✔
397
    debug_assert_eq!(schema.fields.len(), record.values.len());
11,876✔
398
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
107,298✔
399
        if field.nullable && value == &Field::Null {
107,298✔
400
            continue;
17,779✔
401
        }
89,645✔
402
        match field.typ {
89,645✔
403
            FieldType::UInt => {
404
                debug_assert!(value.as_uint().is_some())
41,706✔
405
            }
406
            FieldType::Int => {
×
407
                debug_assert!(value.as_int().is_some())
60✔
408
            }
409
            FieldType::Float => {
410
                debug_assert!(value.as_float().is_some())
12,000✔
411
            }
412
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
413
            FieldType::String => debug_assert!(value.as_string().is_some()),
29,846✔
414
            FieldType::Text => debug_assert!(value.as_text().is_some()),
3✔
415
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
416
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
417
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
6,000✔
418
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
419
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
420
            FieldType::Point => debug_assert!(value.as_point().is_some()),
×
421
        }
×
422
    }
×
423
}
11,960✔
424

425
const INITIAL_RECORD_VERSION: u32 = 1_u32;
×
426

×
427
#[derive(Debug)]
×
428
pub struct LmdbCacheCommon {
429
    record_id_to_record: LmdbMap<u64, Record>,
430
    primary_key_to_record_id: LmdbMap<[u8], u64>,
×
431
    secondary_indexes: SecondaryIndexDatabases,
×
432
    schema_db: SchemaDatabase,
×
433
    cache_options: CacheCommonOptions,
×
434
    /// File name of the database.
×
435
    name: String,
×
436
}
437

438
impl LmdbCacheCommon {
439
    fn new(
286✔
440
        env: &mut LmdbEnvironmentManager,
286✔
441
        options: CacheCommonOptions,
286✔
442
        name: String,
286✔
443
        create_db_if_not_exist: bool,
286✔
444
    ) -> Result<Self, CacheError> {
286✔
445
        // Create or open must have databases.
×
446
        let record_id_to_record =
286✔
447
            LmdbMap::new_from_env(env, Some("records"), create_db_if_not_exist)?;
286✔
448
        let primary_key_to_record_id =
286✔
449
            LmdbMap::new_from_env(env, Some("primary_index"), create_db_if_not_exist)?;
286✔
450
        let schema_db = SchemaDatabase::new(env, create_db_if_not_exist)?;
286✔
451

×
452
        // Open existing secondary index databases.
×
453
        let mut secondary_indexe_databases = HashMap::default();
286✔
454
        for (schema, secondary_indexes) in schema_db.get_all_schemas() {
286✔
455
            let schema_id = schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
133✔
456
            for (index, index_definition) in secondary_indexes.iter().enumerate() {
664✔
457
                let db = new_secondary_index_database_from_env(
664✔
458
                    env,
664✔
459
                    &schema_id,
664✔
460
                    index,
664✔
461
                    index_definition,
664✔
462
                    false,
664✔
463
                )?;
664✔
464
                secondary_indexe_databases.insert((schema_id, index), db);
664✔
465
            }
×
466
        }
×
467

468
        Ok(Self {
286✔
469
            record_id_to_record,
286✔
470
            primary_key_to_record_id,
286✔
471
            secondary_indexes: secondary_indexe_databases,
286✔
472
            schema_db,
286✔
473
            cache_options: options,
286✔
474
            name,
286✔
475
        })
286✔
476
    }
286✔
477

×
478
    fn insert_schema(
144✔
479
        &mut self,
144✔
480
        txn: &mut LmdbExclusiveTransaction,
144✔
481
        schema_name: String,
144✔
482
        schema: Schema,
144✔
483
        secondary_indexes: Vec<IndexDefinition>,
144✔
484
    ) -> Result<(), CacheError> {
144✔
485
        let schema_id = schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
144✔
486
        for (index, index_definition) in secondary_indexes.iter().enumerate() {
685✔
487
            let db = new_secondary_index_database_from_txn(
685✔
488
                txn,
685✔
489
                &schema_id,
685✔
490
                index,
685✔
491
                index_definition,
685✔
492
                true,
685✔
493
            )?;
685✔
494
            self.secondary_indexes.insert((schema_id, index), db);
685✔
495
        }
496

497
        self.schema_db
144✔
498
            .insert(txn.txn_mut(), schema_name, schema, secondary_indexes)?;
144✔
499
        Ok(())
144✔
500
    }
144✔
501
}
502

503
/// Methods for testing.
504
#[cfg(test)]
505
mod tests {
506
    use super::*;
507

508
    impl LmdbRwCache {
509
        pub fn get_txn_and_secondary_indexes(
2✔
510
            &self,
2✔
511
        ) -> (&SharedTransaction, &SecondaryIndexDatabases) {
2✔
512
            (&self.txn, &self.common.secondary_indexes)
2✔
513
        }
2✔
514
    }
515
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc