• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4124646176

pending completion
4124646176

Pull #811

github

GitHub
Merge c6bc261de into f4fe30c14
Pull Request #811: chore: integrating sql planner

737 of 737 new or added lines in 23 files covered. (100.0%)

23321 of 35114 relevant lines covered (66.42%)

35990.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.69
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::collections::HashMap;
2
use std::fmt::Debug;
3
use std::sync::Arc;
4

5
use dozer_storage::lmdb::{RoTransaction, RwTransaction, Transaction};
6
use dozer_storage::lmdb_storage::{
7
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
8
};
9

10
use dozer_types::parking_lot::{RwLock, RwLockReadGuard};
11

12
use dozer_types::types::{Field, FieldType, IndexDefinition, Record};
13
use dozer_types::types::{Schema, SchemaIdentifier};
14

15
use super::super::{RoCache, RwCache};
16
use super::indexer::Indexer;
17
use super::query::handler::LmdbQueryHandler;
18
use super::{
19
    utils, CacheCommonOptions, CacheOptions, CacheOptionsKind, CacheReadOptions, CacheWriteOptions,
20
};
21
use crate::cache::expression::QueryExpression;
22
use crate::cache::index::get_primary_key;
23
use crate::errors::CacheError;
24

25
mod id_database;
26
mod record_database;
27
mod schema_database;
28
mod secondary_index_database;
29

30
pub use id_database::IdDatabase;
31
pub use record_database::RecordDatabase;
×
32
use schema_database::SchemaDatabase;
33
use secondary_index_database::SecondaryIndexDatabase;
34

35
pub type SecondaryIndexDatabases = HashMap<(SchemaIdentifier, usize), SecondaryIndexDatabase>;
36

×
37
#[derive(Debug)]
×
38
pub struct LmdbRoCache {
39
    common: LmdbCacheCommon,
40
    env: LmdbEnvironmentManager,
41
}
42

×
43
impl LmdbRoCache {
×
44
    pub fn new(options: CacheCommonOptions) -> Result<Self, CacheError> {
1✔
45
        let mut env = utils::init_env(&CacheOptions {
1✔
46
            common: options.clone(),
1✔
47
            kind: CacheOptionsKind::ReadOnly(CacheReadOptions {}),
1✔
48
        })?;
1✔
49
        let common = LmdbCacheCommon::new(&mut env, options, true)?;
1✔
50
        Ok(Self { common, env })
1✔
51
    }
1✔
52
}
×
53

×
54
#[derive(Debug)]
×
55
pub struct LmdbRwCache {
×
56
    common: LmdbCacheCommon,
57
    txn: SharedTransaction,
58
}
×
59

×
60
impl LmdbRwCache {
×
61
    pub fn new(
95✔
62
        common_options: CacheCommonOptions,
95✔
63
        write_options: CacheWriteOptions,
95✔
64
    ) -> Result<Self, CacheError> {
95✔
65
        let mut env = utils::init_env(&CacheOptions {
95✔
66
            common: common_options.clone(),
95✔
67
            kind: CacheOptionsKind::Write(write_options),
95✔
68
        })?;
95✔
69
        let common = LmdbCacheCommon::new(&mut env, common_options, false)?;
95✔
70
        let txn = env.create_txn()?;
95✔
71
        Ok(Self { common, txn })
95✔
72
    }
95✔
73
}
×
74

×
75
impl<C: LmdbCache> RoCache for C {
×
76
    fn get(&self, key: &[u8]) -> Result<Record, CacheError> {
33✔
77
        let txn = self.begin_txn()?;
33✔
78
        let txn = txn.as_txn();
33✔
79
        self.common().db.get(txn, self.common().id.get(txn, key)?)
33✔
80
    }
33✔
81

×
82
    fn count(&self, schema_name: &str, query: &QueryExpression) -> Result<usize, CacheError> {
268✔
83
        let txn = self.begin_txn()?;
268✔
84
        let txn = txn.as_txn();
268✔
85
        let handler = self.create_query_handler(txn, schema_name, query)?;
268✔
86
        handler.count()
268✔
87
    }
268✔
88

×
89
    fn query(&self, schema_name: &str, query: &QueryExpression) -> Result<Vec<Record>, CacheError> {
276✔
90
        let txn = self.begin_txn()?;
276✔
91
        let txn = txn.as_txn();
276✔
92
        let handler = self.create_query_handler(txn, schema_name, query)?;
276✔
93
        handler.query()
276✔
94
    }
276✔
95

×
96
    fn get_schema_and_indexes_by_name(
24✔
97
        &self,
24✔
98
        name: &str,
24✔
99
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
24✔
100
        let txn = self.begin_txn()?;
24✔
101
        let txn = txn.as_txn();
24✔
102
        let schema = self.common().schema_db.get_schema_from_name(txn, name)?;
24✔
103
        Ok(schema)
24✔
104
    }
24✔
105

×
106
    fn get_schema(&self, schema_identifier: &SchemaIdentifier) -> Result<Schema, CacheError> {
1✔
107
        let txn = self.begin_txn()?;
1✔
108
        let txn = txn.as_txn();
1✔
109
        self.common()
1✔
110
            .schema_db
1✔
111
            .get_schema(txn, *schema_identifier)
1✔
112
            .map(|(schema, _)| schema)
1✔
113
    }
1✔
114
}
×
115

×
116
impl RwCache for LmdbRwCache {
×
117
    fn insert(&self, record: &Record) -> Result<(), CacheError> {
7,992✔
118
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(record)?;
7,992✔
119

×
120
        let mut txn = self.txn.write();
7,992✔
121
        let txn = txn.txn_mut();
7,992✔
122

×
123
        let id = if schema.primary_index.is_empty() {
7,992✔
124
            self.common.id.get_or_generate(txn, None)?
1✔
125
        } else {
×
126
            let primary_key = get_primary_key(&schema.primary_index, &record.values);
7,991✔
127
            self.common.id.get_or_generate(txn, Some(&primary_key))?
7,991✔
128
        };
×
129
        self.common.db.insert(txn, id, record)?;
7,992✔
130

×
131
        let indexer = Indexer {
7,988✔
132
            secondary_indexes: self.common.secondary_indexes.clone(),
7,988✔
133
        };
7,988✔
134

7,988✔
135
        indexer.build_indexes(txn, record, &schema, &secondary_indexes, id)
7,988✔
136
    }
7,988✔
137

×
138
    fn delete(&self, key: &[u8]) -> Result<(), CacheError> {
11✔
139
        let record = self.get(key)?;
11✔
140
        let (schema, secondary_indexes) = self.get_schema_and_indexes_from_record(&record)?;
11✔
141

×
142
        let mut txn = self.txn.write();
11✔
143
        let txn = txn.txn_mut();
11✔
144

×
145
        let id = self.common.id.get(txn, key)?;
11✔
146
        self.common.db.delete(txn, id)?;
11✔
147

×
148
        let indexer = Indexer {
11✔
149
            secondary_indexes: self.common.secondary_indexes.clone(),
11✔
150
        };
11✔
151
        indexer.delete_indexes(txn, &record, &schema, &secondary_indexes, id)
11✔
152
    }
11✔
153

×
154
    fn update(&self, key: &[u8], record: &Record) -> Result<(), CacheError> {
×
155
        self.delete(key)?;
5✔
156
        self.insert(record)
5✔
157
    }
5✔
158

×
159
    fn insert_schema(
96✔
160
        &self,
96✔
161
        name: &str,
96✔
162
        schema: &Schema,
96✔
163
        secondary_indexes: &[IndexDefinition],
96✔
164
    ) -> Result<(), CacheError> {
96✔
165
        let schema_id = schema
96✔
166
            .identifier
96✔
167
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
96✔
168

169
        // Create a db for each index
×
170
        let mut txn = self.txn.write();
96✔
171
        for (idx, index) in secondary_indexes.iter().enumerate() {
451✔
172
            let db = SecondaryIndexDatabase::create(&mut txn, &schema_id, idx, index, true)?;
451✔
173
            self.common
451✔
174
                .secondary_indexes
451✔
175
                .write()
451✔
176
                .insert((schema_id, idx), db);
451✔
177
        }
×
178

×
179
        self.common
96✔
180
            .schema_db
96✔
181
            .insert(txn.txn_mut(), name, schema, secondary_indexes)?;
96✔
182

×
183
        txn.commit_and_renew()?;
96✔
184
        Ok(())
96✔
185
    }
96✔
186

×
187
    fn commit(&self) -> Result<(), CacheError> {
×
188
        self.txn.write().commit_and_renew()?;
9✔
189
        Ok(())
9✔
190
    }
9✔
191
}
×
192

×
193
/// This trait abstracts the behavior of getting a transaction from a `LmdbExclusiveTransaction` or a `lmdb::Transaction`.
×
194
trait AsTransaction {
×
195
    type Transaction<'a>: Transaction
×
196
    where
×
197
        Self: 'a;
198

×
199
    fn as_txn(&self) -> &Self::Transaction<'_>;
×
200
}
×
201

×
202
impl<'a> AsTransaction for RoTransaction<'a> {
×
203
    type Transaction<'env> = RoTransaction<'env> where Self: 'env;
×
204

×
205
    fn as_txn(&self) -> &Self::Transaction<'_> {
5✔
206
        self
5✔
207
    }
5✔
208
}
×
209

×
210
impl<'a> AsTransaction for RwLockReadGuard<'a, LmdbExclusiveTransaction> {
×
211
    type Transaction<'env> = RwTransaction<'env> where Self: 'env;
×
212

×
213
    fn as_txn(&self) -> &Self::Transaction<'_> {
10,142✔
214
        self.txn()
10,142✔
215
    }
10,142✔
216
}
217

×
218
/// This trait abstracts the behavior of locking a `SharedTransaction` for reading
×
219
/// and beginning a `RoTransaction` from `LmdbEnvironmentManager`.
×
220
trait LmdbCache: Send + Sync + Debug {
×
221
    type AsTransaction<'a>: AsTransaction
×
222
    where
×
223
        Self: 'a;
224

×
225
    fn common(&self) -> &LmdbCacheCommon;
×
226
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError>;
×
227

×
228
    fn create_query_handler<'a, T: Transaction>(
544✔
229
        &'a self,
544✔
230
        txn: &'a T,
544✔
231
        schema_name: &str,
544✔
232
        query: &'a QueryExpression,
544✔
233
    ) -> Result<LmdbQueryHandler<'a, T>, CacheError> {
544✔
234
        let (schema, secondary_indexes) = self
544✔
235
            .common()
544✔
236
            .schema_db
544✔
237
            .get_schema_from_name(txn, schema_name)?;
544✔
238

×
239
        Ok(LmdbQueryHandler::new(
544✔
240
            self.common().db,
544✔
241
            self.common().secondary_indexes.clone(),
544✔
242
            txn,
544✔
243
            schema,
544✔
244
            secondary_indexes,
544✔
245
            query,
544✔
246
            self.common().cache_options.intersection_chunk_size,
544✔
247
        ))
544✔
248
    }
544✔
249

×
250
    fn get_schema_and_indexes_from_record(
8,003✔
251
        &self,
8,003✔
252
        record: &Record,
8,003✔
253
    ) -> Result<(Schema, Vec<IndexDefinition>), CacheError> {
8,003✔
254
        let schema_identifier = record
8,003✔
255
            .schema_id
8,003✔
256
            .ok_or(CacheError::SchemaIdentifierNotFound)?;
8,003✔
257
        let (schema, secondary_indexes) = self
8,003✔
258
            .common()
8,003✔
259
            .schema_db
8,003✔
260
            .get_schema(self.begin_txn()?.as_txn(), schema_identifier)?;
8,003✔
261

×
262
        debug_check_schema_record_consistency(&schema, record);
8,003✔
263

8,003✔
264
        Ok((schema, secondary_indexes))
8,003✔
265
    }
8,003✔
266
}
×
267

×
268
impl LmdbCache for LmdbRoCache {
×
269
    type AsTransaction<'a> = RoTransaction<'a>;
270

×
271
    fn common(&self) -> &LmdbCacheCommon {
12✔
272
        &self.common
12✔
273
    }
12✔
274

×
275
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
5✔
276
        Ok(self.env.begin_ro_txn()?)
5✔
277
    }
5✔
278
}
×
279

×
280
impl LmdbCache for LmdbRwCache {
×
281
    type AsTransaction<'a> = RwLockReadGuard<'a, LmdbExclusiveTransaction>;
×
282

×
283
    fn common(&self) -> &LmdbCacheCommon {
16,219✔
284
        &self.common
16,219✔
285
    }
16,219✔
286

×
287
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
10,142✔
288
        Ok(self.txn.read())
10,142✔
289
    }
10,142✔
290
}
×
291

×
292
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
8,003✔
293
    debug_assert_eq!(schema.identifier, record.schema_id);
8,003✔
294
    debug_assert_eq!(schema.fields.len(), record.values.len());
8,003✔
295
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
71,867✔
296
        if field.nullable && value == &Field::Null {
71,867✔
297
            continue;
11,917✔
298
        }
59,954✔
299
        match field.typ {
59,954✔
300
            FieldType::UInt => {
×
301
                debug_assert!(value.as_uint().is_some())
27,904✔
302
            }
×
303
            FieldType::Int => {
×
304
                debug_assert!(value.as_int().is_some())
54✔
305
            }
×
306
            FieldType::Float => {
×
307
                debug_assert!(value.as_float().is_some())
8,000✔
308
            }
×
309
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
310
            FieldType::String => debug_assert!(value.as_string().is_some()),
19,997✔
311
            FieldType::Text => debug_assert!(value.as_text().is_some()),
3✔
312
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
313
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
314
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
4,000✔
315
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
316
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
317
        }
×
318
    }
×
319
}
8,003✔
320

×
321
#[derive(Debug)]
×
322
pub struct LmdbCacheCommon {
×
323
    db: RecordDatabase,
×
324
    id: IdDatabase,
×
325
    secondary_indexes: Arc<RwLock<SecondaryIndexDatabases>>,
×
326
    schema_db: SchemaDatabase,
×
327
    cache_options: CacheCommonOptions,
×
328
}
×
329

330
impl LmdbCacheCommon {
×
331
    fn new(
96✔
332
        env: &mut LmdbEnvironmentManager,
96✔
333
        options: CacheCommonOptions,
96✔
334
        read_only: bool,
96✔
335
    ) -> Result<Self, CacheError> {
96✔
336
        // Create or open must have databases.
×
337
        let db = RecordDatabase::new(env, !read_only)?;
96✔
338
        let id = IdDatabase::new(env, !read_only)?;
96✔
339
        let schema_db = SchemaDatabase::new(env, !read_only)?;
96✔
340

×
341
        // Open existing secondary index databases.
×
342
        let mut secondary_indexe_databases = HashMap::default();
96✔
343
        let schemas = schema_db.get_all_schemas(env)?;
96✔
344
        for (schema, secondary_indexes) in schemas {
97✔
345
            let schema_id = schema
1✔
346
                .identifier
1✔
347
                .ok_or(CacheError::SchemaIdentifierNotFound)?;
1✔
348
            for (index, index_definition) in secondary_indexes.iter().enumerate() {
4✔
349
                let db = SecondaryIndexDatabase::open(env, &schema_id, index, index_definition)?;
4✔
350
                secondary_indexe_databases.insert((schema_id, index), db);
4✔
351
            }
×
352
        }
×
353

×
354
        Ok(Self {
96✔
355
            db,
96✔
356
            id,
96✔
357
            secondary_indexes: Arc::new(RwLock::new(secondary_indexe_databases)),
96✔
358
            schema_db,
96✔
359
            cache_options: options,
96✔
360
        })
96✔
361
    }
96✔
362
}
363

×
364
/// Methods for testing.
×
365
#[cfg(test)]
×
366
mod tests {
×
367
    use super::*;
×
368

369
    impl LmdbRwCache {
370
        pub fn get_txn_and_secondary_indexes(
3✔
371
            &self,
3✔
372
        ) -> (&SharedTransaction, &RwLock<SecondaryIndexDatabases>) {
3✔
373
            (&self.txn, &self.common.secondary_indexes)
3✔
374
        }
3✔
375
    }
376
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc