• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4381907514

pending completion
4381907514

push

github

GitHub
feat: implement tracing using open telemetry (#1176)

510 of 510 new or added lines in 31 files covered. (100.0%)

27878 of 39615 relevant lines covered (70.37%)

47752.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.62
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::fmt::Debug;
2
use std::path::PathBuf;
3

4
use dozer_storage::lmdb::{RoTransaction, RwTransaction, Transaction};
5
use dozer_storage::lmdb_storage::{
6
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
7
};
8
use dozer_storage::LmdbMultimap;
9

10
use dozer_types::parking_lot::RwLockReadGuard;
11

12
use dozer_types::types::Schema;
13
use dozer_types::types::{IndexDefinition, Record};
14

15
use self::secondary_index_database::{
16
    new_secondary_index_database_from_env, new_secondary_index_database_from_txn,
17
};
18

19
use super::super::{RoCache, RwCache};
20
use super::indexer::Indexer;
21
use super::utils::{self, CacheReadOptions};
22
use super::utils::{CacheOptions, CacheOptionsKind};
23
use crate::cache::expression::QueryExpression;
24
use crate::cache::RecordWithId;
25
use crate::errors::CacheError;
26
use query::LmdbQueryHandler;
27

28
mod helper;
29
mod main_environment;
30
mod query;
31
mod schema_database;
32
mod secondary_index_database;
33

34
use main_environment::MainEnvironment;
35
use schema_database::SchemaDatabase;
36

37
#[derive(Clone, Debug)]
551✔
38
pub struct CacheCommonOptions {
39
    // Total number of readers allowed
40
    pub max_readers: u32,
41
    // Max no of dbs
42
    pub max_db_size: u32,
×
43

44
    /// The chunk size when calculating intersection of index queries.
45
    pub intersection_chunk_size: usize,
46

47
    /// Provide a path where db will be created. If nothing is provided, will default to a temp location.
48
    /// Db path will be `PathBuf.join(String)`.
49
    pub path: Option<(PathBuf, String)>,
50
}
51

52
impl Default for CacheCommonOptions {
53
    fn default() -> Self {
306✔
54
        Self {
306✔
55
            max_readers: 1000,
306✔
56
            max_db_size: 1000,
306✔
57
            intersection_chunk_size: 100,
306✔
58
            path: None,
306✔
59
        }
306✔
60
    }
306✔
61
}
×
62

×
63
#[derive(Debug)]
×
64
pub struct LmdbRoCache {
×
65
    common: LmdbCacheCommon,
×
66
    env: LmdbEnvironmentManager,
67
}
68

×
69
impl LmdbRoCache {
70
    pub fn new(options: CacheCommonOptions) -> Result<Self, CacheError> {
281✔
71
        let (mut env, name) = utils::init_env(&CacheOptions {
281✔
72
            common: options.clone(),
281✔
73
            kind: CacheOptionsKind::ReadOnly(CacheReadOptions {}),
281✔
74
        })?;
281✔
75
        let common = LmdbCacheCommon::new(&mut env, options, name, false)?;
281✔
76
        Ok(Self { common, env })
281✔
77
    }
281✔
78
}
×
79

×
80
#[derive(Clone, Debug)]
×
81
pub struct CacheWriteOptions {
×
82
    // Total size allocated for data in a memory mapped file.
×
83
    // This size is allocated at initialization.
84
    pub max_size: usize,
85
}
×
86

87
impl Default for CacheWriteOptions {
88
    fn default() -> Self {
341✔
89
        Self {
341✔
90
            max_size: 1024 * 1024 * 1024 * 1024,
341✔
91
        }
341✔
92
    }
341✔
93
}
×
94

×
95
#[derive(Debug)]
×
96
pub struct LmdbRwCache {
×
97
    common: LmdbCacheCommon,
×
98
    txn: SharedTransaction,
99
}
100

×
101
impl LmdbRwCache {
102
    pub fn create(
266✔
103
        schema: Schema,
266✔
104
        indexes: Vec<IndexDefinition>,
266✔
105
        common_options: CacheCommonOptions,
266✔
106
        write_options: CacheWriteOptions,
266✔
107
    ) -> Result<Self, CacheError> {
266✔
108
        let mut cache = Self::open(common_options, write_options)?;
266✔
109

×
110
        let mut txn = cache.txn.write();
266✔
111
        cache.common.insert_schema(&mut txn, schema, indexes)?;
266✔
112

×
113
        txn.commit_and_renew()?;
266✔
114
        drop(txn);
290✔
115

290✔
116
        Ok(cache)
290✔
117
    }
290✔
118

×
119
    pub fn open(
270✔
120
        common_options: CacheCommonOptions,
270✔
121
        write_options: CacheWriteOptions,
270✔
122
    ) -> Result<Self, CacheError> {
270✔
123
        let (mut env, name) = utils::init_env(&CacheOptions {
270✔
124
            common: common_options.clone(),
270✔
125
            kind: CacheOptionsKind::Write(write_options),
270✔
126
        })?;
270✔
127
        let common = LmdbCacheCommon::new(&mut env, common_options, name, true)?;
270✔
128
        let txn = env.create_txn()?;
294✔
129
        Ok(Self { common, txn })
294✔
130
    }
294✔
131
}
×
132

×
133
impl<C: LmdbCache> RoCache for C {
×
134
    fn name(&self) -> &str {
322✔
135
        &self.common().name
322✔
136
    }
322✔
137

×
138
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
73✔
139
        let txn = self.begin_txn()?;
73✔
140
        let txn = txn.as_txn();
73✔
141
        self.common().main_environment.get(txn, key)
73✔
142
    }
73✔
143

×
144
    fn count(&self, query: &QueryExpression) -> Result<usize, CacheError> {
4,431✔
145
        let txn = self.begin_txn()?;
4,431✔
146
        let handler = self.create_query_handler(&txn, query)?;
4,431✔
147
        handler.count()
4,431✔
148
    }
4,431✔
149

×
150
    fn query(&self, query: &QueryExpression) -> Result<Vec<RecordWithId>, CacheError> {
4,458✔
151
        let txn = self.begin_txn()?;
4,458✔
152
        let handler = self.create_query_handler(&txn, query)?;
4,458✔
153
        handler.query()
4,458✔
154
    }
4,458✔
155

×
156
    fn get_schema(&self) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
23,484✔
157
        self.get_schema_impl()
23,484✔
158
    }
23,484✔
159
}
×
160

×
161
impl RwCache for LmdbRwCache {
×
162
    fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
23,116✔
163
        let (schema, secondary_indexes) = self.get_schema()?;
23,116✔
164
        self.insert_impl(record, schema, secondary_indexes)
23,116✔
165
    }
23,116✔
166

×
167
    fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
6✔
168
        let (_, _, version) = self.delete_impl(key)?;
6✔
169
        Ok(version)
6✔
170
    }
6✔
171

×
172
    fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
13✔
173
        let (schema, secondary_indexes, old_version) = self.delete_impl(key)?;
13✔
174
        self.insert_impl(record, schema, secondary_indexes)?;
13✔
175
        Ok(old_version)
13✔
176
    }
13✔
177

×
178
    fn commit(&self) -> Result<(), CacheError> {
289✔
179
        let mut txn = self.txn.write();
289✔
180
        txn.commit_and_renew()?;
289✔
181
        Ok(())
289✔
182
    }
289✔
183
}
×
184

×
185
impl LmdbRwCache {
×
186
    fn delete_impl(&self, key: &[u8]) -> Result<(&Schema, &[IndexDefinition], u32), CacheError> {
19✔
187
        let record = self.get(key)?;
19✔
188
        let (schema, secondary_indexes) = self.get_schema()?;
19✔
189

×
190
        let mut txn = self.txn.write();
19✔
191
        let txn = txn.txn_mut();
19✔
192

×
193
        let (version, operation_id) = self.common.main_environment.delete(txn, key)?;
19✔
194

×
195
        let indexer = Indexer {
19✔
196
            secondary_indexes: &self.common.secondary_indexes,
19✔
197
        };
19✔
198
        indexer.delete_indexes(txn, &record.record, secondary_indexes, operation_id)?;
19✔
199
        Ok((schema, secondary_indexes, version))
19✔
200
    }
19✔
201

×
202
    /// Inserts the record, sets the record version, builds the secondary index, and returns the record id.
×
203
    fn insert_impl(
11,616✔
204
        &self,
11,616✔
205
        record: &mut Record,
11,616✔
206
        schema: &Schema,
11,616✔
207
        secondary_indexes: &[IndexDefinition],
11,616✔
208
    ) -> Result<u64, CacheError> {
11,616✔
209
        let span = dozer_types::tracing::span!(dozer_types::tracing::Level::TRACE, "insert_cache");
23,107✔
210
        let _enter = span.enter();
11,604✔
211
        let mut txn = self.txn.write();
11,604✔
212
        let txn = txn.txn_mut();
11,604✔
213

×
214
        let (record_id, operation_id) = self.common.main_environment.insert(txn, record, schema)?;
11,604✔
215

×
216
        let indexer = Indexer {
11,604✔
217
            secondary_indexes: &self.common.secondary_indexes,
11,604✔
218
        };
11,604✔
219

220
        let span = dozer_types::tracing::span!(
11,555✔
221
            dozer_types::tracing::Level::TRACE,
23,043✔
222
            "build_indexes",
×
223
            record_id = record_id,
×
224
            operation_id = operation_id
×
225
        );
×
226
        let _enter = span.enter();
11,555✔
227

11,555✔
228
        indexer.build_indexes(txn, record, secondary_indexes, operation_id)?;
11,555✔
229

×
230
        Ok(record_id)
11,554✔
231
    }
11,554✔
232
}
×
233

×
234
/// This trait abstracts the behavior of getting a transaction from a `LmdbExclusiveTransaction` or a `lmdb::Transaction`.
×
235
trait AsTransaction {
×
236
    type Transaction<'a>: Transaction
×
237
    where
×
238
        Self: 'a;
×
239

×
240
    fn as_txn(&self) -> &Self::Transaction<'_>;
×
241
}
×
242

×
243
impl<'a> AsTransaction for RoTransaction<'a> {
×
244
    type Transaction<'env> = RoTransaction<'env> where Self: 'env;
×
245

×
246
    fn as_txn(&self) -> &Self::Transaction<'_> {
4,427✔
247
        self
4,427✔
248
    }
4,427✔
249
}
×
250

×
251
impl<'a> AsTransaction for RwLockReadGuard<'a, LmdbExclusiveTransaction> {
×
252
    type Transaction<'env> = RwTransaction<'env> where Self: 'env;
×
253

×
254
    fn as_txn(&self) -> &Self::Transaction<'_> {
89✔
255
        self.txn()
89✔
256
    }
89✔
257
}
×
258

×
259
/// This trait abstracts the behavior of locking a `SharedTransaction` for reading
×
260
/// and beginning a `RoTransaction` from `LmdbEnvironmentManager`.
×
261
trait LmdbCache: Send + Sync + Debug {
×
262
    type AsTransaction<'a>: AsTransaction
×
263
    where
×
264
        Self: 'a;
×
265

×
266
    fn common(&self) -> &LmdbCacheCommon;
×
267
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError>;
×
268

×
269
    fn get_schema_impl(&self) -> Result<&(Schema, Vec<IndexDefinition>), CacheError> {
16,233✔
270
        self.common()
16,233✔
271
            .schema_db
16,233✔
272
            .get_schema()
16,233✔
273
            .ok_or(CacheError::SchemaNotFound)
16,233✔
274
    }
16,233✔
275

×
276
    fn create_query_handler<'a, 'as_txn>(
4,473✔
277
        &'a self,
4,473✔
278
        txn: &'a Self::AsTransaction<'as_txn>,
4,473✔
279
        query: &'a QueryExpression,
4,473✔
280
    ) -> Result<
4,473✔
281
        LmdbQueryHandler<'a, <Self::AsTransaction<'as_txn> as AsTransaction>::Transaction<'a>>,
4,473✔
282
        CacheError,
4,473✔
283
    > {
4,473✔
284
        let txn = txn.as_txn();
4,473✔
285
        let (schema, secondary_indexes) = self.get_schema_impl()?;
4,473✔
286
        let handler = LmdbQueryHandler::new(self.common(), txn, schema, secondary_indexes, query);
4,473✔
287
        Ok(handler)
4,473✔
288
    }
4,473✔
289
}
×
290

×
291
impl LmdbCache for LmdbRoCache {
×
292
    type AsTransaction<'a> = RoTransaction<'a>;
×
293

×
294
    fn common(&self) -> &LmdbCacheCommon {
8,974✔
295
        &self.common
8,974✔
296
    }
8,974✔
297

×
298
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
4,427✔
299
        Ok(self.env.begin_ro_txn()?)
4,427✔
300
    }
4,427✔
301
}
×
302

×
303
impl LmdbCache for LmdbRwCache {
×
304
    type AsTransaction<'a> = RwLockReadGuard<'a, LmdbExclusiveTransaction>;
305

×
306
    fn common(&self) -> &LmdbCacheCommon {
11,935✔
307
        &self.common
11,935✔
308
    }
11,935✔
309

×
310
    fn begin_txn(&self) -> Result<Self::AsTransaction<'_>, CacheError> {
89✔
311
        Ok(self.txn.read())
89✔
312
    }
89✔
313
}
×
314

315
#[derive(Debug)]
×
316
pub struct LmdbCacheCommon {
×
317
    main_environment: MainEnvironment,
×
318
    secondary_indexes: Vec<LmdbMultimap<Vec<u8>, u64>>,
×
319
    schema_db: SchemaDatabase,
×
320
    cache_options: CacheCommonOptions,
×
321
    /// File name of the database.
×
322
    name: String,
323
}
×
324

×
325
impl LmdbCacheCommon {
326
    fn new(
299✔
327
        env: &mut LmdbEnvironmentManager,
299✔
328
        options: CacheCommonOptions,
299✔
329
        name: String,
299✔
330
        create_db_if_not_exist: bool,
299✔
331
    ) -> Result<Self, CacheError> {
299✔
332
        // Create or open must have databases.
×
333
        let main_environment = MainEnvironment::new(env, create_db_if_not_exist)?;
299✔
334
        let schema_db = SchemaDatabase::new(env, create_db_if_not_exist)?;
299✔
335

336
        // Open existing secondary index databases.
×
337
        let mut secondary_indexe_databases = vec![];
299✔
338
        if let Some((_, secondary_indexes)) = schema_db.get_schema() {
299✔
339
            for (index, index_definition) in secondary_indexes.iter().enumerate() {
699✔
340
                let db =
694✔
341
                    new_secondary_index_database_from_env(env, index, index_definition, false)?;
694✔
342
                secondary_indexe_databases.push(db);
694✔
343
            }
×
344
        }
152✔
345

×
346
        Ok(Self {
299✔
347
            main_environment,
299✔
348
            secondary_indexes: secondary_indexe_databases,
299✔
349
            schema_db,
299✔
350
            cache_options: options,
299✔
351
            name,
299✔
352
        })
299✔
353
    }
299✔
354

×
355
    fn insert_schema(
152✔
356
        &mut self,
152✔
357
        txn: &mut LmdbExclusiveTransaction,
152✔
358
        schema: Schema,
152✔
359
        secondary_indexes: Vec<IndexDefinition>,
152✔
360
    ) -> Result<(), CacheError> {
152✔
361
        for (index, index_definition) in secondary_indexes.iter().enumerate() {
717✔
362
            let db = new_secondary_index_database_from_txn(txn, index, index_definition, true)?;
717✔
363
            self.secondary_indexes.push(db);
717✔
364
        }
365

×
366
        self.schema_db
152✔
367
            .insert(txn.txn_mut(), schema, secondary_indexes)?;
152✔
368
        Ok(())
152✔
369
    }
152✔
370
}
×
371

×
372
/// Methods for testing.
×
373
#[cfg(test)]
374
mod tests {
×
375
    use super::*;
×
376

×
377
    impl LmdbRwCache {
378
        pub fn get_txn_and_secondary_indexes(
2✔
379
            &self,
2✔
380
        ) -> (&SharedTransaction, &[LmdbMultimap<Vec<u8>, u64>]) {
2✔
381
            (&self.txn, &self.common.secondary_indexes)
2✔
382
        }
2✔
383
    }
384
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc