• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4113913291

pending completion
4113913291

Pull #821

github

GitHub
Merge a8cca3f0b into 8f74ec17e
Pull Request #821: refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync`

869 of 869 new or added lines in 45 files covered. (100.0%)

23486 of 37503 relevant lines covered (62.62%)

36806.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.1
/dozer-cache/src/cache/lmdb/cache/record_database.rs
1
use dozer_storage::{
2
    lmdb::{Database, DatabaseFlags, RoCursor, RwTransaction, Transaction, WriteFlags},
3
    lmdb_storage::LmdbEnvironmentManager,
4
};
5
use dozer_types::{bincode, types::Record};
6

7
use crate::{
8
    cache::lmdb::query::helper,
9
    errors::{CacheError, QueryError},
10
};
11

12
#[derive(Debug, Clone, Copy)]
×
13
pub struct RecordDatabase(Database);
14

15
impl RecordDatabase {
16
    pub fn new(
98✔
17
        env: &mut LmdbEnvironmentManager,
98✔
18
        create_if_not_exist: bool,
98✔
19
    ) -> Result<Self, CacheError> {
98✔
20
        let flags = if create_if_not_exist {
98✔
21
            Some(DatabaseFlags::INTEGER_KEY)
96✔
22
        } else {
23
            None
2✔
24
        };
25
        let db = env.create_database(Some("records"), flags)?;
98✔
26
        Ok(Self(db))
98✔
27
    }
98✔
28

29
    pub fn insert(
7,993✔
30
        &self,
7,993✔
31
        txn: &mut RwTransaction,
7,993✔
32
        id: [u8; 8],
7,993✔
33
        record: &Record,
7,993✔
34
    ) -> Result<(), CacheError> {
7,993✔
35
        let encoded: Vec<u8> =
7,993✔
36
            bincode::serialize(&record).map_err(CacheError::map_serialization_error)?;
7,993✔
37

38
        txn.put(self.0, &id, &encoded.as_slice(), WriteFlags::NO_OVERWRITE)
7,993✔
39
            .map_err(|e| CacheError::Query(QueryError::InsertValue(e)))
7,993✔
40
    }
7,993✔
41

42
    pub fn get<T: Transaction>(&self, txn: &T, id: [u8; 8]) -> Result<Record, CacheError> {
490,000✔
43
        helper::get(txn, self.0, &id)
490,000✔
44
    }
490,000✔
45

46
    pub fn delete(&self, txn: &mut RwTransaction, id: [u8; 8]) -> Result<(), CacheError> {
12✔
47
        txn.del(self.0, &id, None)
12✔
48
            .map_err(|e| CacheError::Query(QueryError::DeleteValue(e)))
12✔
49
    }
12✔
50

51
    pub fn count(&self, txn: &impl Transaction) -> Result<usize, CacheError> {
23✔
52
        helper::lmdb_stat(txn, self.0)
23✔
53
            .map(|stat| stat.ms_entries)
23✔
54
            .map_err(|e| CacheError::Internal(Box::new(e)))
23✔
55
    }
23✔
56

57
    pub fn open_ro_cursor<'txn, T: Transaction>(
22✔
58
        &self,
22✔
59
        txn: &'txn T,
22✔
60
    ) -> Result<RoCursor<'txn>, CacheError> {
22✔
61
        txn.open_ro_cursor(self.0)
22✔
62
            .map_err(|e| CacheError::Internal(Box::new(e)))
22✔
63
    }
22✔
64
}
65

66
#[cfg(test)]
67
mod tests {
68
    use dozer_storage::lmdb_storage::LmdbTransaction;
69

70
    use crate::cache::{lmdb::utils::init_env, CacheOptions};
71

72
    use super::*;
×
73

×
74
    #[test]
1✔
75
    fn test_record_database() {
1✔
76
        let mut env = init_env(&CacheOptions::default()).unwrap();
1✔
77
        let writer = RecordDatabase::new(&mut env, true).unwrap();
1✔
78
        let reader = RecordDatabase::new(&mut env, false).unwrap();
1✔
79
        let txn = env.create_txn().unwrap();
1✔
80
        let mut txn = txn.write();
1✔
81
        assert_eq!(writer.count(txn.txn()).unwrap(), 0);
1✔
82
        assert_eq!(reader.count(txn.txn()).unwrap(), 0);
1✔
83
        txn.commit_and_renew().unwrap();
1✔
84

1✔
85
        let id = 1u64;
1✔
86
        let record = Record::new(None, vec![], None);
1✔
87

1✔
88
        writer
1✔
89
            .insert(txn.txn_mut(), id.to_be_bytes(), &record)
1✔
90
            .unwrap();
1✔
91
        txn.commit_and_renew().unwrap();
1✔
92

1✔
93
        assert_eq!(writer.count(txn.txn()).unwrap(), 1);
1✔
94
        assert_eq!(reader.count(txn.txn()).unwrap(), 1);
1✔
95
        assert_eq!(writer.get(txn.txn(), id.to_be_bytes()).unwrap(), record);
1✔
96
        assert_eq!(reader.get(txn.txn(), id.to_be_bytes()).unwrap(), record);
1✔
97
        txn.commit_and_renew().unwrap();
1✔
98

1✔
99
        writer.delete(txn.txn_mut(), id.to_be_bytes()).unwrap();
1✔
100
        txn.commit_and_renew().unwrap();
1✔
101

1✔
102
        assert_eq!(writer.count(txn.txn()).unwrap(), 0);
1✔
103
        assert_eq!(reader.count(txn.txn()).unwrap(), 0);
1✔
104
        assert!(writer.get(txn.txn(), id.to_be_bytes()).is_err());
1✔
105
        assert!(reader.get(txn.txn(), id.to_be_bytes()).is_err());
1✔
106
        txn.commit_and_renew().unwrap();
1✔
107
    }
1✔
108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc