• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4116183752

pending completion
4116183752

push

github

GitHub
refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync` (#821)

790 of 790 new or added lines in 44 files covered. (100.0%)

23005 of 33842 relevant lines covered (67.98%)

56312.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.37
/dozer-cache/src/cache/lmdb/cache/record_database.rs
1
use dozer_storage::{
2
    lmdb::{Database, DatabaseFlags, RoCursor, RwTransaction, Transaction, WriteFlags},
3
    lmdb_storage::LmdbEnvironmentManager,
4
};
5
use dozer_types::{bincode, types::Record};
6

7
use crate::{
8
    cache::lmdb::query::helper,
9
    errors::{CacheError, QueryError},
10
};
11

12
#[derive(Debug, Clone, Copy)]
×
13
pub struct RecordDatabase(Database);
14

15
impl RecordDatabase {
16
    pub fn new(
98✔
17
        env: &mut LmdbEnvironmentManager,
98✔
18
        create_if_not_exist: bool,
98✔
19
    ) -> Result<Self, CacheError> {
98✔
20
        let flags = if create_if_not_exist {
98✔
21
            Some(DatabaseFlags::INTEGER_KEY)
96✔
22
        } else {
23
            None
2✔
24
        };
25
        let db = env.create_database(Some("records"), flags)?;
98✔
26
        Ok(Self(db))
98✔
27
    }
98✔
28

29
    pub fn insert(
7,993✔
30
        &self,
7,993✔
31
        txn: &mut RwTransaction,
7,993✔
32
        id: [u8; 8],
7,993✔
33
        record: &Record,
7,993✔
34
    ) -> Result<(), CacheError> {
7,993✔
35
        let encoded: Vec<u8> =
7,993✔
36
            bincode::serialize(&record).map_err(CacheError::map_serialization_error)?;
7,993✔
37

38
        txn.put(self.0, &id, &encoded.as_slice(), WriteFlags::NO_OVERWRITE)
7,993✔
39
            .map_err(|e| CacheError::Query(QueryError::InsertValue(e)))
7,993✔
40
    }
7,993✔
41

42
    pub fn get<T: Transaction>(&self, txn: &T, id: [u8; 8]) -> Result<Record, CacheError> {
490,000✔
43
        helper::get(txn, self.0, &id)
490,000✔
44
    }
490,000✔
45

46
    pub fn delete(&self, txn: &mut RwTransaction, id: [u8; 8]) -> Result<(), CacheError> {
12✔
47
        txn.del(self.0, &id, None)
12✔
48
            .map_err(|e| CacheError::Query(QueryError::DeleteValue(e)))
12✔
49
    }
12✔
50

51
    pub fn count(&self, txn: &impl Transaction) -> Result<usize, CacheError> {
23✔
52
        helper::lmdb_stat(txn, self.0)
23✔
53
            .map(|stat| stat.ms_entries)
23✔
54
            .map_err(|e| CacheError::Internal(Box::new(e)))
23✔
55
    }
23✔
56

57
    pub fn open_ro_cursor<'txn, T: Transaction>(
22✔
58
        &self,
22✔
59
        txn: &'txn T,
22✔
60
    ) -> Result<RoCursor<'txn>, CacheError> {
22✔
61
        txn.open_ro_cursor(self.0)
22✔
62
            .map_err(|e| CacheError::Internal(Box::new(e)))
22✔
63
    }
22✔
64
}
65

66
#[cfg(test)]
67
mod tests {
68
    use crate::cache::{lmdb::utils::init_env, CacheOptions};
69

70
    use super::*;
71

72
    #[test]
1✔
73
    fn test_record_database() {
1✔
74
        let mut env = init_env(&CacheOptions::default()).unwrap();
1✔
75
        let writer = RecordDatabase::new(&mut env, true).unwrap();
1✔
76
        let reader = RecordDatabase::new(&mut env, false).unwrap();
1✔
77
        let txn = env.create_txn().unwrap();
1✔
78
        let mut txn = txn.write();
1✔
79
        assert_eq!(writer.count(txn.txn()).unwrap(), 0);
1✔
80
        assert_eq!(reader.count(txn.txn()).unwrap(), 0);
1✔
81
        txn.commit_and_renew().unwrap();
1✔
82

1✔
83
        let id = 1u64;
1✔
84
        let record = Record::new(None, vec![], None);
1✔
85

1✔
86
        writer
1✔
87
            .insert(txn.txn_mut(), id.to_be_bytes(), &record)
1✔
88
            .unwrap();
1✔
89
        txn.commit_and_renew().unwrap();
1✔
90

1✔
91
        assert_eq!(writer.count(txn.txn()).unwrap(), 1);
1✔
92
        assert_eq!(reader.count(txn.txn()).unwrap(), 1);
1✔
93
        assert_eq!(writer.get(txn.txn(), id.to_be_bytes()).unwrap(), record);
1✔
94
        assert_eq!(reader.get(txn.txn(), id.to_be_bytes()).unwrap(), record);
1✔
95
        txn.commit_and_renew().unwrap();
1✔
96

1✔
97
        writer.delete(txn.txn_mut(), id.to_be_bytes()).unwrap();
1✔
98
        txn.commit_and_renew().unwrap();
1✔
99

1✔
100
        assert_eq!(writer.count(txn.txn()).unwrap(), 0);
1✔
101
        assert_eq!(reader.count(txn.txn()).unwrap(), 0);
1✔
102
        assert!(writer.get(txn.txn(), id.to_be_bytes()).is_err());
1✔
103
        assert!(reader.get(txn.txn(), id.to_be_bytes()).is_err());
1✔
104
        txn.commit_and_renew().unwrap();
1✔
105
    }
1✔
106
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc