• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4113913291

pending completion
4113913291

Pull #821

github

GitHub
Merge a8cca3f0b into 8f74ec17e
Pull Request #821: refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync`

869 of 869 new or added lines in 45 files covered. (100.0%)

23486 of 37503 relevant lines covered (62.62%)

36806.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.73
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::collections::HashMap;
2
use std::fmt::Debug;
3
use std::ops::Deref;
4
use std::sync::Arc;
5

6
use dozer_storage::lmdb::{RoTransaction, RwTransaction, Transaction};
7
use dozer_storage::lmdb_storage::{
8
    LmdbEnvironmentManager, LmdbExclusiveRoTransaction, LmdbExclusiveTransaction, LmdbTransaction,
9
    SharedRoTransaction, SharedTransaction,
10
};
11
use dozer_types::parking_lot::{RwLock, RwLockWriteGuard};
12

13
use dozer_types::types::{Field, FieldType, IndexDefinition, Record};
14
use dozer_types::types::{Schema, SchemaIdentifier};
15

16
use super::super::{RoCache, RwCache};
17
use super::indexer::Indexer;
18
use super::query::handler::LmdbQueryHandler;
19
use super::{
20
    utils, CacheCommonOptions, CacheOptions, CacheOptionsKind, CacheReadOptions, CacheWriteOptions,
21
};
22
use crate::cache::expression::QueryExpression;
23
use crate::cache::index::get_primary_key;
24
use crate::errors::CacheError;
25

26
mod id_database;
27
mod record_database;
28
mod schema_database;
29
mod secondary_index_database;
30

31
pub use id_database::IdDatabase;
×
32
pub use record_database::RecordDatabase;
33
use schema_database::SchemaDatabase;
34
use secondary_index_database::SecondaryIndexDatabase;
35

36
pub type SecondaryIndexDatabases = HashMap<(SchemaIdentifier, usize), SecondaryIndexDatabase>;
37

38
#[derive(Debug)]
×
39
pub struct LmdbRoCache {
40
    common: LmdbCacheCommon,
41
    txn: SharedRoTransaction,
42
}
×
43

×
44
impl LmdbRoCache {
×
45
    pub fn new(options: CacheCommonOptions) -> Result<Self, CacheError> {
1✔
46
        let mut env = utils::init_env(&CacheOptions {
1✔
47
            common: options.clone(),
1✔
48
            kind: CacheOptionsKind::ReadOnly(CacheReadOptions {}),
1✔
49
        })?;
1✔
50
        let common = LmdbCacheCommon::new(&mut env, options, true)?;
1✔
51
        let txn = env.create_ro_txn()?;
1✔
52
        Ok(Self { common, txn })
1✔
53
    }
1✔
54
}
×
55

×
56
#[derive(Debug)]
×
57
pub struct LmdbRwCache {
58
    common: LmdbCacheCommon,
×
59
    txn: SharedTransaction,
×
60
}
×
61

×
62
impl LmdbRwCache {
×
63
    pub fn new(
95✔
64
        common_options: CacheCommonOptions,
95✔
65
        write_options: CacheWriteOptions,
95✔
66
    ) -> Result<Self, CacheError> {
95✔
67
        let mut env = utils::init_env(&CacheOptions {
95✔
68
            common: common_options.clone(),
95✔
69
            kind: CacheOptionsKind::Write(write_options),
95✔
70
        })?;
95✔
71
        let common = LmdbCacheCommon::new(&mut env, common_options, false)?;
95✔
72
        let txn = env.create_txn()?;
95✔
73
        Ok(Self { common, txn })
95✔
74
    }
95✔
75
}
×
76

×
77
pub trait LmdbCache: Send + Sync + Debug {
×
78
    type Transaction: Transaction + 'static;
×
79
    type LmdbExclusiveTransaction: LmdbTransaction<Self::Transaction>;
×
80
    type LmdbExclusiveTransactionPointer<'a>: Deref<Target = Self::LmdbExclusiveTransaction>
81
    where
×
82
        Self: 'a;
×
83

×
84
    fn common(&self) -> &LmdbCacheCommon;
×
85
    fn txn(&self) -> Self::LmdbExclusiveTransactionPointer<'_>;
×
86

×
87
    fn create_query_handler<'a>(
544✔
88
        &self,
544✔
89
        txn: &'a Self::Transaction,
544✔
90
        schema_name: &str,
544✔
91
        query: &'a QueryExpression,
544✔
92
    ) -> Result<LmdbQueryHandler<'a, Self::Transaction>, CacheError>
544✔
93
    where
544✔
94
        Self: Sized,
544✔
95
    {
544✔
96
        let (schema, secondary_indexes) = self
544✔
97
            .common()
544✔
98
            .schema_db
544✔
99
            .get_schema_from_name(txn, schema_name)?;
544✔
100

×
101
        Ok(LmdbQueryHandler::new(
544✔
102
            self.common().db,
544✔
103
            self.common().secondary_indexes.clone(),
544✔
104
            txn,
544✔
105
            schema,
544✔
106
            secondary_indexes,
544✔
107
            query,
544✔
108
            self.common().cache_options.intersection_chunk_size,
544✔
109
        ))
544✔
110
    }
544✔
111

×
112
    fn get_schema_and_indexes_from_record(
7,999✔
113
        &self,
7,999✔
114
        record: &Record,
7,999✔
115
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
7,999✔
116
        let schema_identifier = record
7,999✔
117
            .schema_id
7,999✔
118
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
7,999✔
119
        let (schema, secondary_indexes) = self
8,003✔
120
            .common()
7,999✔
121
            .schema_db
7,999✔
122
            .get_schema(self.txn().txn(), schema_identifier)?;
7,999✔
123

×
124
        debug_check_schema_record_consistency(&schema, record);
8,003✔
125

8,003✔
126
        Ok((schema, secondary_indexes))
8,003✔
127
    }
8,003✔
128
}
×
129

×
130
impl LmdbCache for LmdbRoCache {
×
131
    type Transaction = RoTransaction<'static>;
132
    type LmdbExclusiveTransaction = LmdbExclusiveRoTransaction;
×
133
    type LmdbExclusiveTransactionPointer<'a> = &'a LmdbExclusiveRoTransaction;
×
134

135
    fn common(&self) -> &LmdbCacheCommon {
12✔
136
        &self.common
12✔
137
    }
12✔
138

×
139
    fn txn(&self) -> Self::LmdbExclusiveTransactionPointer<'_> {
5✔
140
        self.txn.get()
5✔
141
    }
5✔
142
}
×
143

×
144
impl LmdbCache for LmdbRwCache {
×
145
    type Transaction = RwTransaction<'static>;
×
146
    type LmdbExclusiveTransaction = LmdbExclusiveTransaction;
×
147
    type LmdbExclusiveTransactionPointer<'a> = RwLockWriteGuard<'a, LmdbExclusiveTransaction>;
×
148

×
149
    fn common(&self) -> &LmdbCacheCommon {
16,215✔
150
        &self.common
16,215✔
151
    }
16,215✔
152

×
153
    fn txn(&self) -> Self::LmdbExclusiveTransactionPointer<'_> {
18,141✔
154
        self.txn.write()
18,141✔
155
    }
18,141✔
156
}
×
157

×
158
impl<C: LmdbCache> RoCache for C {
×
159
    fn get(&self, key: &[u8]) -> Result<Record, CacheError> {
33✔
160
        let txn = self.txn();
33✔
161
        let txn = txn.txn();
33✔
162
        self.common().db.get(txn, self.common().id.get(txn, key)?)
33✔
163
    }
33✔
164

×
165
    fn count(&self, schema_name: &str, query: &QueryExpression) -> Result<usize, CacheError> {
268✔
166
        let txn = self.txn();
268✔
167
        let txn = txn.txn();
268✔
168
        let handler = self.create_query_handler(txn, schema_name, query)?;
268✔
169
        handler.count()
268✔
170
    }
268✔
171

×
172
    fn query(&self, schema_name: &str, query: &QueryExpression) -> Result<Vec<Record>, CacheError> {
276✔
173
        let txn = self.txn();
276✔
174
        let txn = txn.txn();
276✔
175
        let handler = self.create_query_handler(txn, schema_name, query)?;
276✔
176
        handler.query()
276✔
177
    }
276✔
178

×
179
    fn get_schema_and_indexes_by_name(
24✔
180
        &self,
24✔
181
        name: &str,
24✔
182
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
24✔
183
        let txn = self.txn();
24✔
184
        let txn = txn.txn();
24✔
185
        let schema = self.common().schema_db.get_schema_from_name(txn, name)?;
24✔
186
        Ok(schema)
24✔
187
    }
24✔
188

×
189
    fn get_schema(&self, schema_identifier: &SchemaIdentifier) -> Result<Schema, CacheError> {
1✔
190
        let txn = self.txn();
1✔
191
        let txn = txn.txn();
1✔
192
        self.common()
1✔
193
            .schema_db
1✔
194
            .get_schema(txn, *schema_identifier)
1✔
195
            .map(|(schema, _)| schema)
1✔
196
    }
1✔
197
}
198

×
199
impl RwCache for LmdbRwCache {
×
200
    fn insert(&self, record: &Record) -> Result<(), CacheError> {
7,988✔
201
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(record)?;
7,988✔
202

×
203
        let mut txn = self.txn();
7,988✔
204
        let txn = txn.txn_mut();
7,988✔
205

×
206
        let id = if schema.primary_index.is_empty() {
7,988✔
207
            self.common.id.get_or_generate(txn, None)?
1✔
208
        } else {
×
209
            let primary_key = get_primary_key(&schema.primary_index, &record.values);
7,987✔
210
            self.common.id.get_or_generate(txn, Some(&primary_key))?
7,987✔
211
        };
×
212
        self.common.db.insert(txn, id, record)?;
7,988✔
213

×
214
        let indexer = Indexer {
7,992✔
215
            secondary_indexes: self.common.secondary_indexes.clone(),
7,992✔
216
        };
7,992✔
217

7,992✔
218
        indexer.build_indexes(txn, record, &schema, &secondary_indexes, id)
7,992✔
219
    }
7,992✔
220

×
221
    fn delete(&self, key: &[u8]) -> Result<(), CacheError> {
11✔
222
        let record = self.get(key)?;
11✔
223
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(&record)?;
11✔
224

×
225
        let mut txn = self.txn();
11✔
226
        let txn = txn.txn_mut();
11✔
227

×
228
        let id = self.common.id.get(txn, key)?;
11✔
229
        self.common.db.delete(txn, id)?;
11✔
230

×
231
        let indexer = Indexer {
11✔
232
            secondary_indexes: self.common.secondary_indexes.clone(),
11✔
233
        };
11✔
234
        indexer.delete_indexes(txn, &record, &schema, &secondary_indexes, id)
11✔
235
    }
11✔
236

237
    fn update(&self, key: &[u8], record: &Record) -> Result<(), CacheError> {
×
238
        self.delete(key)?;
5✔
239
        self.insert(record)
5✔
240
    }
5✔
241

×
242
    fn insert_schema(
96✔
243
        &self,
96✔
244
        name: &str,
96✔
245
        schema: &Schema,
96✔
246
        secondary_indexes: &[IndexDefinition],
96✔
247
    ) -> Result<(), CacheError> {
96✔
248
        let schema_id = schema
96✔
249
            .identifier
96✔
250
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
96✔
251

×
252
        // Create a db for each index
×
253
        let mut txn = self.txn.write();
96✔
254
        for (idx, index) in secondary_indexes.iter().enumerate() {
451✔
255
            let db = SecondaryIndexDatabase::create(&mut txn, &schema_id, idx, index, true)?;
451✔
256
            self.common
451✔
257
                .secondary_indexes
451✔
258
                .write()
451✔
259
                .insert((schema_id, idx), db);
451✔
260
        }
×
261

×
262
        self.common
96✔
263
            .schema_db
96✔
264
            .insert(txn.txn_mut(), name, schema, secondary_indexes)?;
96✔
265

×
266
        txn.commit_and_renew()?;
96✔
267
        Ok(())
96✔
268
    }
96✔
269

270
    fn commit(&self) -> Result<(), CacheError> {
×
271
        self.txn.write().commit_and_renew()?;
9✔
272
        Ok(())
9✔
273
    }
9✔
274
}
×
275

×
276
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
7,995✔
277
    debug_assert_eq!(schema.identifier, record.schema_id);
7,995✔
278
    debug_assert_eq!(schema.fields.len(), record.values.len());
7,991✔
279
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
71,811✔
280
        if field.nullable && value == &Field::Null {
71,811✔
281
            continue;
11,917✔
282
        }
59,918✔
283
        match field.typ {
59,918✔
284
            FieldType::UInt => debug_assert!(value.as_uint().is_some()),
27,900✔
285
            FieldType::Int => debug_assert!(value.as_int().is_some()),
54✔
286
            FieldType::Float => debug_assert!(value.as_float().is_some()),
8,000✔
287
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
288
            FieldType::String => debug_assert!(value.as_string().is_some()),
19,993✔
289
            FieldType::Text => debug_assert!(value.as_text().is_some()),
3✔
290
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
291
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
292
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
4,000✔
293
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
294
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
295
        }
×
296
    }
×
297
}
8,003✔
298

×
299
#[derive(Debug)]
×
300
pub struct LmdbCacheCommon {
×
301
    db: RecordDatabase,
×
302
    id: IdDatabase,
×
303
    secondary_indexes: Arc<RwLock<SecondaryIndexDatabases>>,
×
304
    schema_db: SchemaDatabase,
×
305
    cache_options: CacheCommonOptions,
306
}
307

×
308
impl LmdbCacheCommon {
×
309
    fn new(
96✔
310
        env: &mut LmdbEnvironmentManager,
96✔
311
        options: CacheCommonOptions,
96✔
312
        read_only: bool,
96✔
313
    ) -> Result<Self, CacheError> {
96✔
314
        // Create or open must have databases.
×
315
        let db = RecordDatabase::new(env, !read_only)?;
96✔
316
        let id = IdDatabase::new(env, !read_only)?;
96✔
317
        let schema_db = SchemaDatabase::new(env, !read_only)?;
96✔
318

×
319
        // Open existing secondary index databases.
×
320
        let mut secondary_indexe_databases = HashMap::default();
96✔
321
        let schemas = schema_db.get_all_schemas(env)?;
96✔
322
        for (schema, secondary_indexes) in schemas {
97✔
323
            let schema_id = schema
1✔
324
                .identifier
1✔
325
                .ok_or(CacheError::SchemaIdentifierNotFound)?;
1✔
326
            for (index, index_definition) in secondary_indexes.iter().enumerate() {
4✔
327
                let db = SecondaryIndexDatabase::open(env, &schema_id, index, index_definition)?;
4✔
328
                secondary_indexe_databases.insert((schema_id, index), db);
4✔
329
            }
330
        }
331

332
        Ok(Self {
96✔
333
            db,
96✔
334
            id,
96✔
335
            secondary_indexes: Arc::new(RwLock::new(secondary_indexe_databases)),
96✔
336
            schema_db,
96✔
337
            cache_options: options,
96✔
338
        })
96✔
339
    }
96✔
340
}
×
341

342
/// Methods for testing.
343
#[cfg(test)]
344
mod tests {
345
    use super::*;
346

347
    impl LmdbRwCache {
348
        pub fn get_txn_and_secondary_indexes(
3✔
349
            &self,
3✔
350
        ) -> (&SharedTransaction, &RwLock<SecondaryIndexDatabases>) {
3✔
351
            (&self.txn, &self.common.secondary_indexes)
3✔
352
        }
3✔
353
    }
354
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc