• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283045331

pending completion
4283045331

push

github

GitHub
feat: Support timestamp diff (#1074)

58 of 58 new or added lines in 2 files covered. (100.0%)

27146 of 37535 relevant lines covered (72.32%)

33460.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.28
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::collections::HashMap;
2
use std::fmt::Debug;
3
use std::path::PathBuf;
4
use std::sync::Arc;
5

6
use dozer_storage::lmdb::{RoTransaction, RwTransaction, Transaction};
7
use dozer_storage::lmdb_storage::{
8
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
9
};
10

11
use dozer_types::parking_lot::{RwLock, RwLockReadGuard};
12

13
use dozer_types::types::{Field, FieldType, IndexDefinition, Record};
14
use dozer_types::types::{Schema, SchemaIdentifier};
15

16
use super::super::{RoCache, RwCache};
17
use super::indexer::Indexer;
18
use super::utils::{self, CacheReadOptions};
19
use super::utils::{CacheOptions, CacheOptionsKind};
20
use crate::cache::expression::QueryExpression;
21
use crate::cache::index::get_primary_key;
22
use crate::cache::RecordWithId;
23
use crate::errors::CacheError;
24
use query::LmdbQueryHandler;
25

26
mod helper;
27
mod id_database;
28
mod query;
29
mod record_database;
30
mod schema_database;
31
mod secondary_index_database;
32

33
pub use id_database::IdDatabase;
34
pub use record_database::RecordDatabase;
35
use schema_database::SchemaDatabase;
36
use secondary_index_database::SecondaryIndexDatabase;
37

38
pub type SecondaryIndexDatabases = HashMap<(SchemaIdentifier, usize), SecondaryIndexDatabase>;
39

40
#[derive(Clone, Debug)]
286✔
41
pub struct CacheCommonOptions {
42
    // Total number of readers allowed
43
    pub max_readers: u32,
44
    // Max no of dbs
45
    pub max_db_size: u32,
46

47
    /// The chunk size when calculating intersection of index queries.
48
    pub intersection_chunk_size: usize,
49

50
    /// Provide a path where db will be created. If nothing is provided, will default to a temp location.
51
    /// Db path will be `PathBuf.join(String)`.
52
    pub path: Option<(PathBuf, String)>,
53
}
54

55
impl Default for CacheCommonOptions {
56
    fn default() -> Self {
153✔
57
        Self {
153✔
58
            max_readers: 1000,
153✔
59
            max_db_size: 1000,
153✔
60
            intersection_chunk_size: 100,
153✔
61
            path: None,
153✔
62
        }
153✔
63
    }
153✔
64
}
65

66
#[derive(Debug)]
×
67
pub struct LmdbRoCache {
68
    common: LmdbCacheCommon,
69
    env: LmdbEnvironmentManager,
70
}
71

72
impl LmdbRoCache {
73
    pub fn new(options: CacheCommonOptions) -> Result<Self, CacheError> {
137✔
74
        let (mut env, name) = utils::init_env(&CacheOptions {
137✔
75
            common: options.clone(),
137✔
76
            kind: CacheOptionsKind::ReadOnly(CacheReadOptions {}),
137✔
77
        })?;
137✔
78
        let common = LmdbCacheCommon::new(&mut env, options, name, true)?;
137✔
79
        Ok(Self { common, env })
137✔
80
    }
137✔
81
}
82

83
#[derive(Clone, Debug)]
×
84
pub struct CacheWriteOptions {
85
    // Total size allocated for data in a memory mapped file.
86
    // This size is allocated at initialization.
87
    pub max_size: usize,
88
}
89

90
impl Default for CacheWriteOptions {
91
    fn default() -> Self {
152✔
92
        Self {
152✔
93
            max_size: 1024 * 1024 * 1024 * 1024,
152✔
94
        }
152✔
95
    }
152✔
96
}
97

98
#[derive(Debug)]
×
99
pub struct LmdbRwCache {
100
    common: LmdbCacheCommon,
101
    txn: SharedTransaction,
102
}
103

104
impl LmdbRwCache {
105
    pub fn create(
145✔
106
        schemas: impl IntoIterator<Item = (String, Schema, Vec<IndexDefinition>)>,
145✔
107
        common_options: CacheCommonOptions,
145✔
108
        write_options: CacheWriteOptions,
145✔
109
    ) -> Result<Self, CacheError> {
145✔
110
        let mut cache = Self::open(common_options, write_options)?;
145✔
111

112
        let mut txn = cache.txn.write();
145✔
113
        for (schema_name, schema, secondary_indexes) in schemas {
289✔
114
            cache
144✔
115
                .common
144✔
116
                .insert_schema(&mut txn, schema_name, schema, secondary_indexes)?;
144✔
117
        }
118

119
        txn.commit_and_renew()?;
145✔
120
        drop(txn);
145✔
121

145✔
122
        Ok(cache)
145✔
123
    }
145✔
124

125
    pub fn open(
149✔
126
        common_options: CacheCommonOptions,
149✔
127
        write_options: CacheWriteOptions,
149✔
128
    ) -> Result<Self, CacheError> {
149✔
129
        let (mut env, name) = utils::init_env(&CacheOptions {
149✔
130
            common: common_options.clone(),
149✔
131
            kind: CacheOptionsKind::Write(write_options),
149✔
132
        })?;
149✔
133
        let common = LmdbCacheCommon::new(&mut env, common_options, name, false)?;
149✔
134
        let txn = env.create_txn()?;
149✔
135
        Ok(Self { common, txn })
149✔
136
    }
149✔
137
}
138

139
impl<C: LmdbCache> RoCache for C {
140
    fn name(&self) -> &str {
142✔
141
        &self.common().name
142✔
142
    }
142✔
143

144
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
43✔
145
        let txn = self.begin_txn()?;
43✔
146
        let txn = txn.as_txn();
43✔
147
        let id = self.common().id.get(txn, key)?;
43✔
148
        let record = self.common().db.get(txn, id)?;
43✔
149
        Ok(RecordWithId::new(id_from_bytes(id), record))
36✔
150
    }
43✔
151

152
    fn count(&self, schema_name: &str, query: &QueryExpression) -> Result<usize, CacheError> {
2,220✔
153
        let txn = self.begin_txn()?;
2,220✔
154
        let txn = txn.as_txn();
2,220✔
155
        let (schema, secondary_indexes) = self
2,220✔
156
            .common()
2,220✔
157
            .schema_db
2,220✔
158
            .get_schema_from_name(schema_name)
2,220✔
159
            .ok_or_else(|| CacheError::SchemaNotFound(schema_name.to_string()))?;
2,220✔
160
        let handler = LmdbQueryHandler::new(self.common(), txn, schema, secondary_indexes, query);
2,220✔
161
        handler.count()
2,220✔
162
    }
2,220✔
163

164
    fn query(
2,237✔
165
        &self,
2,237✔
166
        schema_name: &str,
2,237✔
167
        query: &QueryExpression,
2,237✔
168
    ) -> Result<(&Schema, Vec<RecordWithId>), CacheError> {
2,237✔
169
        let txn = self.begin_txn()?;
2,237✔
170
        let txn = txn.as_txn();
2,237✔
171
        let (schema, secondary_indexes) = self
2,237✔
172
            .common()
2,237✔
173
            .schema_db
2,237✔
174
            .get_schema_from_name(schema_name)
2,237✔
175
            .ok_or_else(|| CacheError::SchemaNotFound(schema_name.to_string()))?;
2,237✔
176
        let handler = LmdbQueryHandler::new(self.common(), txn, schema, secondary_indexes, query);
2,237✔
177
        let records = handler.query()?;
2,237✔
178
        Ok((schema, records))
2,236✔
179
    }
2,237✔
180

181
    fn get_schema_and_indexes_by_name(
55✔
182
        &self,
55✔
183
        name: &str,
55✔
184
    ) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
55✔
185
        let schema = self
55✔
186
            .common()
55✔
187
            .schema_db
55✔
188
            .get_schema_from_name(name)
55✔
189
            .ok_or_else(|| CacheError::SchemaNotFound(name.to_string()))?;
55✔
190
        Ok(schema)
55✔
191
    }
55✔
192

193
    fn get_schema(&self, schema_identifier: SchemaIdentifier) -> Result<&Schema, CacheError> {
1✔
194
        self.common()
1✔
195
            .schema_db
1✔
196
            .get_schema(schema_identifier)
1✔
197
            .map(|(schema, _)| schema)
1✔
198
            .ok_or(CacheError::SchemaIdentifierNotFound(schema_identifier))
1✔
199
    }
1✔
200
}
201

202
impl RwCache for LmdbRwCache {
203
    fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
11,971✔
204
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(record)?;
11,971✔
205
        record.version = Some(INITIAL_RECORD_VERSION);
11,971✔
206
        self.insert_impl(record, schema, secondary_indexes)
11,971✔
207
    }
11,971✔
208

209
    fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
6✔
210
        let (_, _, version) = self.delete_impl(key)?;
6✔
211
        Ok(version)
6✔
212
    }
6✔
213

214
    fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
7✔
215
        let (schema, secondary_indexes, old_version) = self.delete_impl(key)?;
7✔
216
        record.version = Some(old_version + 1);
7✔
217
        self.insert_impl(record, schema, secondary_indexes)?;
7✔
218
        Ok(old_version)
7✔
219
    }
7✔
220

221
    fn commit(&self) -> Result<(), CacheError> {
222
        self.txn.write().commit_and_renew()?;
139✔
223
        Ok(())
139✔
224
    }
139✔
225
}
226

227
impl LmdbRwCache {
228
    fn delete_impl(&self, key: &[u8]) -> Result<(&Schema, &[IndexDefinition], u32), CacheError> {
13✔
229
        let record = self.get(key)?.record;
13✔
230
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(&record)?;
13✔
231

232
        let mut txn = self.txn.write();
13✔
233
        let txn = txn.txn_mut();
13✔
234

235
        let id = self.common.id.get(txn, key)?;
13✔
236
        self.common.db.delete(txn, id)?;
13✔
237

238
        let indexer = Indexer {
13✔
239
            secondary_indexes: self.common.secondary_indexes.clone(),
13✔
240
        };
13✔
241
        indexer.delete_indexes(txn, &record, schema, secondary_indexes, id)?;
13✔
242
        let version = record
13✔
243
            .version
13✔
244
            .expect("All records in cache should have a version");
13✔
245
        Ok((schema, secondary_indexes, version))
13✔
246
    }
13✔
247

248
    fn insert_impl(
11,978✔
249
        &self,
11,978✔
250
        record: &Record,
11,978✔
251
        schema: &Schema,
11,978✔
252
        secondary_indexes: &[IndexDefinition],
11,978✔
253
    ) -> Result<u64, CacheError> {
11,978✔
254
        let mut txn = self.txn.write();
11,978✔
255
        let txn = txn.txn_mut();
11,978✔
256

257
        let id = if schema.primary_index.is_empty() {
11,978✔
258
            self.common.id.get_or_generate(txn, None)?
1✔
259
        } else {
260
            let primary_key = get_primary_key(&schema.primary_index, &record.values);
11,977✔
261
            self.common.id.get_or_generate(txn, Some(&primary_key))?
11,977✔
262
        };
263
        self.common.db.insert(txn, id, record)?;
11,978✔
264

265
        let indexer = Indexer {
11,978✔
266
            secondary_indexes: self.common.secondary_indexes.clone(),
11,978✔
267
        };
11,978✔
268

11,978✔
269
        indexer.build_indexes(txn, record, schema, secondary_indexes, id)?;
11,978✔
270

271
        Ok(id_from_bytes(id))
11,978✔
272
    }
11,978✔
273
}
274

275
fn id_from_bytes(bytes: [u8; 8]) -> u64 {
2,389,433✔
276
    u64::from_be_bytes(bytes)
2,389,433✔
277
}
2,389,433✔
278

279
fn id_to_bytes(id: u64) -> [u8; 8] {
1,139,602✔
280
    id.to_be_bytes()
1,139,602✔
281
}
1,139,602✔
282

283
/// This trait abstracts the behavior of getting a transaction from a `LmdbExclusiveTransaction` or a `lmdb::Transaction`.
284
trait AsTransaction {
285
    type Transaction<'a>: Transaction
286
    where
287
        Self: 'a;
288

289
    fn as_txn(&self) -> &Self::Transaction<'_>;
290
}
291

292
impl<'a> AsTransaction for RoTransaction<'a> {
293
    type Transaction<'env> = RoTransaction<'env> where Self: 'env;
294

295
    fn as_txn(&self) -> &Self::Transaction<'_> {
4,421✔
296
        self
4,421✔
297
    }
4,421✔
298
}
299

300
impl<'a> AsTransaction for RwLockReadGuard<'a, LmdbExclusiveTransaction> {
301
    type Transaction<'env> = RwTransaction<'env> where Self: 'env;
302

303
    fn as_txn(&self) -> &Self::Transaction<'_> {
79✔
304
        self.txn()
79✔
305
    }
79✔
306
}
307

308
/// This trait abstracts the behavior of locking a `SharedTransaction` for reading
309
/// and beginning a `RoTransaction` from `LmdbEnvironmentManager`.
310
trait LmdbCache: Send + Sync + Debug {
311
    type AsTransaction<'a>: AsTransaction
312
    where
313
        Self: 'a;
314

315
    fn common(&self) -> &LmdbCacheCommon;
316
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError>;
317

318
    fn get_schema_and_indexes_from_record(
11,984✔
319
        &self,
11,984✔
320
        record: &Record,
11,984✔
321
    ) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
11,984✔
322
        let schema_identifier = record.schema_id.ok_or(CacheError::SchemaHasNoIdentifier)?;
11,984✔
323
        let schema = self
11,984✔
324
            .common()
11,984✔
325
            .schema_db
11,984✔
326
            .get_schema(schema_identifier)
11,984✔
327
            .ok_or(CacheError::SchemaIdentifierNotFound(schema_identifier))?;
11,984✔
328

329
        debug_check_schema_record_consistency(&schema.0, record);
11,984✔
330

11,984✔
331
        Ok(schema)
11,984✔
332
    }
11,984✔
333
}
334

335
impl LmdbCache for LmdbRoCache {
336
    type AsTransaction<'a> = RoTransaction<'a>;
337

338
    fn common(&self) -> &LmdbCacheCommon {
8,882✔
339
        &self.common
8,882✔
340
    }
8,882✔
341

342
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
4,421✔
343
        Ok(self.env.begin_ro_txn()?)
4,421✔
344
    }
4,421✔
345
}
346

347
impl LmdbCache for LmdbRwCache {
348
    type AsTransaction<'a> = RwLockReadGuard<'a, LmdbExclusiveTransaction>;
349

350
    fn common(&self) -> &LmdbCacheCommon {
12,300✔
351
        &self.common
12,300✔
352
    }
12,300✔
353

354
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
79✔
355
        Ok(self.txn.read())
79✔
356
    }
79✔
357
}
358

359
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
11,984✔
360
    debug_assert_eq!(schema.identifier, record.schema_id);
11,984✔
361
    debug_assert_eq!(schema.fields.len(), record.values.len());
11,984✔
362
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
107,766✔
363
        if field.nullable && value == &Field::Null {
107,766✔
364
            continue;
17,869✔
365
        }
89,903✔
366
        match field.typ {
89,903✔
367
            FieldType::UInt => {
368
                debug_assert!(value.as_uint().is_some())
41,862✔
369
            }
370
            FieldType::Int => {
371
                debug_assert!(value.as_int().is_some())
60✔
372
            }
373
            FieldType::Float => {
374
                debug_assert!(value.as_float().is_some())
12,000✔
375
            }
376
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
377
            FieldType::String => debug_assert!(value.as_string().is_some()),
29,978✔
378
            FieldType::Text => debug_assert!(value.as_text().is_some()),
3✔
379
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
380
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
381
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
6,000✔
382
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
383
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
384
            FieldType::Point => debug_assert!(value.as_point().is_some()),
×
385
        }
386
    }
387
}
11,984✔
388

389
const INITIAL_RECORD_VERSION: u32 = 1_u32;
390

391
#[derive(Debug)]
×
392
pub struct LmdbCacheCommon {
393
    db: RecordDatabase,
394
    id: IdDatabase,
395
    secondary_indexes: Arc<RwLock<SecondaryIndexDatabases>>,
396
    schema_db: SchemaDatabase,
397
    cache_options: CacheCommonOptions,
398
    /// File name of the database.
399
    name: String,
400
}
401

402
impl LmdbCacheCommon {
403
    fn new(
286✔
404
        env: &mut LmdbEnvironmentManager,
286✔
405
        options: CacheCommonOptions,
286✔
406
        name: String,
286✔
407
        read_only: bool,
286✔
408
    ) -> Result<Self, CacheError> {
286✔
409
        // Create or open must have databases.
410
        let db = RecordDatabase::new(env, !read_only)?;
286✔
411
        let id = IdDatabase::new(env, !read_only)?;
286✔
412
        let schema_db = SchemaDatabase::new(env, !read_only)?;
286✔
413

414
        // Open existing secondary index databases.
415
        let mut secondary_indexe_databases = HashMap::default();
286✔
416
        for (schema, secondary_indexes) in schema_db.get_all_schemas() {
286✔
417
            let schema_id = schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
133✔
418
            for (index, index_definition) in secondary_indexes.iter().enumerate() {
664✔
419
                let db = SecondaryIndexDatabase::open(env, &schema_id, index, index_definition)?;
664✔
420
                secondary_indexe_databases.insert((schema_id, index), db);
664✔
421
            }
422
        }
423

424
        Ok(Self {
286✔
425
            db,
286✔
426
            id,
286✔
427
            secondary_indexes: Arc::new(RwLock::new(secondary_indexe_databases)),
286✔
428
            schema_db,
286✔
429
            cache_options: options,
286✔
430
            name,
286✔
431
        })
286✔
432
    }
286✔
433

434
    fn insert_schema(
144✔
435
        &mut self,
144✔
436
        txn: &mut LmdbExclusiveTransaction,
144✔
437
        schema_name: String,
144✔
438
        schema: Schema,
144✔
439
        secondary_indexes: Vec<IndexDefinition>,
144✔
440
    ) -> Result<(), CacheError> {
144✔
441
        let schema_id = schema.identifier.ok_or(CacheError::SchemaHasNoIdentifier)?;
144✔
442
        for (index, index_definition) in secondary_indexes.iter().enumerate() {
687✔
443
            let db =
687✔
444
                SecondaryIndexDatabase::create(txn, &schema_id, index, index_definition, true)?;
687✔
445
            self.secondary_indexes
687✔
446
                .write()
687✔
447
                .insert((schema_id, index), db);
687✔
448
        }
449

450
        self.schema_db
144✔
451
            .insert(txn.txn_mut(), schema_name, schema, secondary_indexes)?;
144✔
452
        Ok(())
144✔
453
    }
144✔
454
}
455

456
/// Methods for testing.
457
#[cfg(test)]
458
mod tests {
459
    use super::*;
460

461
    impl LmdbRwCache {
462
        pub fn get_txn_and_secondary_indexes(
3✔
463
            &self,
3✔
464
        ) -> (&SharedTransaction, &RwLock<SecondaryIndexDatabases>) {
3✔
465
            (&self.txn, &self.common.secondary_indexes)
3✔
466
        }
3✔
467
    }
468
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc