• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4370280743

pending completion
4370280743

push

github

GitHub
Bump async-trait from 0.1.65 to 0.1.66 (#1179)

27808 of 38702 relevant lines covered (71.85%)

25323.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.86
/dozer-cache/src/cache/lmdb/cache_manager.rs
1
use std::path::PathBuf;
2

3
use dozer_storage::{
4
    errors::StorageError,
5
    lmdb::{Database, DatabaseFlags},
6
    lmdb_storage::{LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction},
7
};
8
use dozer_types::types::{IndexDefinition, Schema};
9
use tempdir::TempDir;
10

11
use crate::{
12
    cache::{CacheManager, RoCache, RwCache},
13
    errors::CacheError,
14
};
15

16
use super::cache::{CacheCommonOptions, CacheWriteOptions, LmdbRoCache, LmdbRwCache};
17

18
#[derive(Debug, Clone)]
×
19
pub struct CacheManagerOptions {
20
    // Total number of readers allowed
21
    pub max_readers: u32,
22
    // Max no of dbs
23
    pub max_db_size: u32,
24

25
    /// The chunk size when calculating intersection of index queries.
26
    pub intersection_chunk_size: usize,
27

28
    // Total size allocated for data in a memory mapped file.
29
    // This size is allocated at initialization.
30
    pub max_size: usize,
31

32
    /// Provide a path where db will be created. If nothing is provided, will default to a temp directory.
33
    pub path: Option<PathBuf>,
34
}
35

36
impl Default for CacheManagerOptions {
37
    fn default() -> Self {
133✔
38
        let cache_common_options = CacheCommonOptions::default();
133✔
39
        let cache_write_options = CacheWriteOptions::default();
133✔
40
        Self {
133✔
41
            max_readers: cache_common_options.max_readers,
133✔
42
            max_db_size: cache_common_options.max_db_size,
133✔
43
            intersection_chunk_size: cache_common_options.intersection_chunk_size,
133✔
44
            max_size: cache_write_options.max_size,
133✔
45
            path: None,
133✔
46
        }
133✔
47
    }
133✔
48
}
49

50
#[derive(Debug)]
×
51
pub struct LmdbCacheManager {
52
    options: CacheManagerOptions,
53
    base_path: PathBuf,
54
    alias_db: Database,
55
    txn: SharedTransaction,
56
    _temp_dir: Option<TempDir>,
57
}
58

59
impl LmdbCacheManager {
60
    pub fn new(options: CacheManagerOptions) -> Result<Self, CacheError> {
145✔
61
        let (temp_dir, base_path) = match &options.path {
145✔
62
            Some(path) => {
18✔
63
                std::fs::create_dir_all(path)?;
18✔
64
                (None, path.clone())
18✔
65
            }
66
            None => {
67
                let temp_dir = TempDir::new("dozer").expect("Unable to create temp dir");
127✔
68
                let base_path = temp_dir.path().to_path_buf();
127✔
69
                (Some(temp_dir), base_path)
127✔
70
            }
71
        };
72

73
        let mut env = LmdbEnvironmentManager::create(
145✔
74
            &base_path,
145✔
75
            LMDB_CACHE_MANAGER_ALIAS_ENV_NAME,
145✔
76
            Default::default(),
145✔
77
        )?;
145✔
78
        let alias_db = env.create_database(None, Some(DatabaseFlags::empty()))?;
145✔
79
        let txn = env.create_txn()?;
145✔
80

81
        Ok(Self {
145✔
82
            options,
145✔
83
            base_path,
145✔
84
            alias_db,
145✔
85
            txn,
145✔
86
            _temp_dir: temp_dir,
145✔
87
        })
145✔
88
    }
145✔
89
}
90

91
impl CacheManager for LmdbCacheManager {
92
    fn open_rw_cache(&self, name: &str) -> Result<Option<Box<dyn RwCache>>, CacheError> {
16✔
93
        let mut txn = self.txn.write();
16✔
94
        // Open a new transaction to make sure we get the latest changes.
16✔
95
        txn.commit_and_renew()?;
16✔
96
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
16✔
97
        let cache: Option<Box<dyn RwCache>> =
16✔
98
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
16✔
99
                let cache = LmdbRwCache::open(
4✔
100
                    self.cache_common_options(real_name.to_string()),
4✔
101
                    self.cache_write_options(),
4✔
102
                )?;
4✔
103
                Some(Box::new(cache))
4✔
104
            } else {
105
                None
12✔
106
            };
107
        Ok(cache)
16✔
108
    }
16✔
109

110
    fn open_ro_cache(&self, name: &str) -> Result<Option<Box<dyn RoCache>>, CacheError> {
136✔
111
        let mut txn = self.txn.write();
136✔
112
        // Open a new transaction to make sure we get the latest changes.
136✔
113
        txn.commit_and_renew()?;
136✔
114
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
136✔
115
        let cache: Option<Box<dyn RoCache>> =
136✔
116
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
136✔
117
                let cache = LmdbRoCache::new(self.cache_common_options(real_name.to_string()))?;
136✔
118
                Some(Box::new(cache))
136✔
119
            } else {
120
                None
×
121
            };
122
        Ok(cache)
136✔
123
    }
136✔
124

125
    fn create_cache(
134✔
126
        &self,
134✔
127
        schema: Schema,
134✔
128
        indexes: Vec<IndexDefinition>,
134✔
129
    ) -> Result<Box<dyn RwCache>, CacheError> {
134✔
130
        let name = self.generate_unique_name();
134✔
131
        let cache = LmdbRwCache::create(
134✔
132
            schema,
134✔
133
            indexes,
134✔
134
            self.cache_common_options(name),
134✔
135
            self.cache_write_options(),
134✔
136
        )?;
134✔
137
        Ok(Box::new(cache))
134✔
138
    }
134✔
139

×
140
    fn create_alias(&self, name: &str, alias: &str) -> Result<(), CacheError> {
129✔
141
        let mut txn = self.txn.write();
129✔
142
        txn.put(self.alias_db, alias.as_bytes(), name.as_bytes())?;
129✔
143
        txn.commit_and_renew()?;
129✔
144
        Ok(())
129✔
145
    }
129✔
146
}
147

148
const LMDB_CACHE_MANAGER_ALIAS_ENV_NAME: &str = "__DOZER_CACHE_MANAGER_ALIAS__";
149

×
150
impl LmdbCacheManager {
×
151
    fn cache_common_options(&self, name: String) -> CacheCommonOptions {
274✔
152
        CacheCommonOptions {
274✔
153
            max_db_size: self.options.max_db_size,
274✔
154
            max_readers: self.options.max_readers,
274✔
155
            intersection_chunk_size: self.options.intersection_chunk_size,
274✔
156
            path: Some((self.base_path.clone(), name)),
274✔
157
        }
274✔
158
    }
274✔
159

×
160
    fn cache_write_options(&self) -> CacheWriteOptions {
138✔
161
        CacheWriteOptions {
138✔
162
            max_size: self.options.max_size,
138✔
163
        }
138✔
164
    }
138✔
165

×
166
    fn generate_unique_name(&self) -> String {
134✔
167
        uuid::Uuid::new_v4().to_string()
134✔
168
    }
134✔
169

×
170
    fn resolve_alias<'a>(
152✔
171
        &self,
152✔
172
        alias: &str,
152✔
173
        txn: &'a LmdbExclusiveTransaction,
152✔
174
    ) -> Result<Option<&'a str>, StorageError> {
152✔
175
        txn.get(self.alias_db, alias.as_bytes()).map(|bytes| {
152✔
176
            bytes.map(|bytes| {
152✔
177
                std::str::from_utf8(bytes).expect("Real names should always be utf8 string")
126✔
178
            })
152✔
179
        })
152✔
180
    }
152✔
181
}
182

183
#[cfg(test)]
184
mod tests {
185
    use super::*;
×
186

×
187
    #[test]
1✔
188
    fn test_lmdb_cache_manager() {
1✔
189
        let cache_manager = LmdbCacheManager::new(Default::default()).unwrap();
1✔
190
        let real_name = cache_manager
1✔
191
            .create_cache(Schema::empty(), vec![])
1✔
192
            .unwrap()
1✔
193
            .name()
1✔
194
            .to_string();
1✔
195
        // Test open with real name.
1✔
196
        assert_eq!(
1✔
197
            cache_manager
1✔
198
                .open_rw_cache(&real_name)
1✔
199
                .unwrap()
1✔
200
                .unwrap()
1✔
201
                .name(),
1✔
202
            real_name
1✔
203
        );
1✔
204
        assert_eq!(
1✔
205
            cache_manager
1✔
206
                .open_ro_cache(&real_name)
1✔
207
                .unwrap()
1✔
208
                .unwrap()
1✔
209
                .name(),
1✔
210
            real_name
1✔
211
        );
1✔
212
        // Test open with alias.
×
213
        let alias = "alias";
1✔
214
        cache_manager.create_alias(&real_name, alias).unwrap();
1✔
215
        assert_eq!(
1✔
216
            cache_manager.open_rw_cache(alias).unwrap().unwrap().name(),
1✔
217
            real_name
1✔
218
        );
1✔
219
        assert_eq!(
1✔
220
            cache_manager.open_ro_cache(alias).unwrap().unwrap().name(),
1✔
221
            real_name
1✔
222
        );
1✔
223
        // Test duplicate alias and real name.
×
224
        cache_manager.create_alias(&real_name, &real_name).unwrap();
1✔
225
        assert_eq!(
1✔
226
            cache_manager
1✔
227
                .open_rw_cache(&real_name)
1✔
228
                .unwrap()
1✔
229
                .unwrap()
1✔
230
                .name(),
1✔
231
            real_name
1✔
232
        );
1✔
233
        assert_eq!(
1✔
234
            cache_manager
1✔
235
                .open_ro_cache(&real_name)
1✔
236
                .unwrap()
1✔
237
                .unwrap()
1✔
238
                .name(),
1✔
239
            real_name
1✔
240
        );
1✔
241
        // If name is both alias and real name, alias shadows real name.
×
242
        let real_name2 = cache_manager
1✔
243
            .create_cache(Schema::empty(), vec![])
1✔
244
            .unwrap()
1✔
245
            .name()
1✔
246
            .to_string();
1✔
247
        cache_manager.create_alias(&real_name, &real_name2).unwrap();
1✔
248
        assert_eq!(
1✔
249
            cache_manager
1✔
250
                .open_rw_cache(&real_name)
1✔
251
                .unwrap()
1✔
252
                .unwrap()
1✔
253
                .name(),
1✔
254
            real_name
1✔
255
        );
1✔
256
        assert_eq!(
1✔
257
            cache_manager
1✔
258
                .open_ro_cache(&real_name)
1✔
259
                .unwrap()
1✔
260
                .unwrap()
1✔
261
                .name(),
1✔
262
            real_name
1✔
263
        );
1✔
264
    }
1✔
265
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc