• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4393407942

pending completion
4393407942

push

github

GitHub
fix: Lmdb environments should use the same map size (#1216)

55 of 55 new or added lines in 7 files covered. (100.0%)

28459 of 38371 relevant lines covered (74.17%)

85320.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.06
/dozer-cache/src/cache/lmdb/cache_manager.rs
1
use std::path::PathBuf;
2

3
use dozer_storage::{
4
    errors::StorageError,
5
    lmdb::{Database, DatabaseFlags},
6
    lmdb_storage::{
7
        CreateDatabase, LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
8
    },
9
};
10
use dozer_types::{
11
    parking_lot::Mutex,
12
    types::{IndexDefinition, Schema},
13
};
14
use tempdir::TempDir;
15

16
use crate::{
17
    cache::{CacheManager, RoCache, RwCache},
18
    errors::CacheError,
19
};
20

×
21
use super::{
22
    cache::{CacheOptions, LmdbRoCache, LmdbRwCache},
23
    indexing::IndexingThreadPool,
24
};
25

26
#[derive(Debug, Clone)]
×
27
pub struct CacheManagerOptions {
28
    // Total number of readers allowed
29
    pub max_readers: u32,
30
    // Max no of dbs
31
    pub max_db_size: u32,
32

33
    // Total size allocated for data in a memory mapped file.
34
    // This size is allocated at initialization.
35
    pub max_size: usize,
36

37
    /// The chunk size when calculating intersection of index queries.
38
    pub intersection_chunk_size: usize,
39

×
40
    /// Provide a path where db will be created. If nothing is provided, will default to a temp directory.
×
41
    pub path: Option<PathBuf>,
×
42

×
43
    /// Number of threads in the indexing thread pool.
×
44
    pub num_indexing_threads: usize,
×
45
}
×
46

×
47
impl Default for CacheManagerOptions {
×
48
    fn default() -> Self {
157✔
49
        let cache_options = CacheOptions::default();
157✔
50
        Self {
157✔
51
            max_readers: cache_options.max_readers,
157✔
52
            max_db_size: cache_options.max_db_size,
157✔
53
            intersection_chunk_size: cache_options.intersection_chunk_size,
157✔
54
            max_size: cache_options.max_size,
157✔
55
            path: None,
157✔
56
            num_indexing_threads: 4,
157✔
57
        }
157✔
58
    }
157✔
59
}
60

61
#[derive(Debug)]
×
62
pub struct LmdbCacheManager {
×
63
    options: CacheManagerOptions,
×
64
    base_path: PathBuf,
×
65
    alias_db: Database,
×
66
    txn: SharedTransaction,
×
67
    indexing_thread_pool: Mutex<IndexingThreadPool>,
68
    _temp_dir: Option<TempDir>,
69
}
×
70

×
71
impl LmdbCacheManager {
×
72
    pub fn new(options: CacheManagerOptions) -> Result<Self, CacheError> {
157✔
73
        let (temp_dir, base_path) = match &options.path {
157✔
74
            Some(path) => {
36✔
75
                std::fs::create_dir_all(path).map_err(|e| CacheError::Io(path.clone(), e))?;
36✔
76
                (None, path.clone())
36✔
77
            }
×
78
            None => {
×
79
                let temp_dir = TempDir::new("dozer").expect("Unable to create temp dir");
121✔
80
                let base_path = temp_dir.path().to_path_buf();
121✔
81
                (Some(temp_dir), base_path)
121✔
82
            }
83
        };
×
84

×
85
        let mut env = LmdbEnvironmentManager::create(
157✔
86
            &base_path,
157✔
87
            LMDB_CACHE_MANAGER_ALIAS_ENV_NAME,
157✔
88
            Default::default(),
157✔
89
        )?;
157✔
90
        let alias_db = env.create_database(None, Some(DatabaseFlags::empty()))?;
157✔
91
        let txn = env.create_txn()?;
157✔
92

93
        let indexing_thread_pool =
157✔
94
            Mutex::new(IndexingThreadPool::new(options.num_indexing_threads));
157✔
95

157✔
96
        Ok(Self {
157✔
97
            options,
157✔
98
            base_path,
157✔
99
            alias_db,
157✔
100
            txn,
157✔
101
            indexing_thread_pool,
157✔
102
            _temp_dir: temp_dir,
157✔
103
        })
157✔
104
    }
157✔
105

×
106
    /// Blocks current thread until all secondary indexes are up to date with the last cache commit.
107
    ///
×
108
    /// If any cache commits during this call in another thread, those commits may or may not be indexed when this function returns.
109
    pub fn wait_until_indexing_catchup(&self) {
120✔
110
        self.indexing_thread_pool.lock().wait_until_catchup();
120✔
111
    }
120✔
112
}
×
113

×
114
impl CacheManager for LmdbCacheManager {
×
115
    fn open_rw_cache(&self, name: &str) -> Result<Option<Box<dyn RwCache>>, CacheError> {
22✔
116
        let mut txn = self.txn.write();
22✔
117
        // Open a new transaction to make sure we get the latest changes.
22✔
118
        txn.commit_and_renew()?;
22✔
119
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
22✔
120
        let cache: Option<Box<dyn RwCache>> =
22✔
121
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
22✔
122
                let cache = LmdbRwCache::new(
4✔
123
                    None,
4✔
124
                    &self.cache_options(real_name.to_string()),
4✔
125
                    &mut self.indexing_thread_pool.lock(),
4✔
126
                )?;
4✔
127
                Some(Box::new(cache))
4✔
128
            } else {
×
129
                None
18✔
130
            };
×
131
        Ok(cache)
22✔
132
    }
22✔
133

×
134
    fn open_ro_cache(&self, name: &str) -> Result<Option<Box<dyn RoCache>>, CacheError> {
142✔
135
        let mut txn = self.txn.write();
142✔
136
        // Open a new transaction to make sure we get the latest changes.
142✔
137
        txn.commit_and_renew()?;
142✔
138
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
142✔
139
        let cache: Option<Box<dyn RoCache>> =
142✔
140
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
142✔
141
                let cache = LmdbRoCache::new(&self.cache_options(real_name.to_string()))?;
142✔
142
                Some(Box::new(cache))
142✔
143
            } else {
×
144
                None
×
145
            };
×
146
        Ok(cache)
142✔
147
    }
142✔
148

149
    fn create_cache(
140✔
150
        &self,
140✔
151
        schema: Schema,
140✔
152
        indexes: Vec<IndexDefinition>,
140✔
153
    ) -> Result<Box<dyn RwCache>, CacheError> {
140✔
154
        let name = self.generate_unique_name();
140✔
155
        let cache = LmdbRwCache::new(
140✔
156
            Some(&(schema, indexes)),
140✔
157
            &self.cache_options(name),
140✔
158
            &mut self.indexing_thread_pool.lock(),
140✔
159
        )?;
140✔
160
        Ok(Box::new(cache))
140✔
161
    }
140✔
162

×
163
    fn create_alias(&self, name: &str, alias: &str) -> Result<(), CacheError> {
135✔
164
        let mut txn = self.txn.write();
135✔
165
        txn.put(self.alias_db, alias.as_bytes(), name.as_bytes())?;
135✔
166
        txn.commit_and_renew()?;
135✔
167
        Ok(())
135✔
168
    }
135✔
169
}
×
170

171
const LMDB_CACHE_MANAGER_ALIAS_ENV_NAME: &str = "__DOZER_CACHE_MANAGER_ALIAS__";
×
172

×
173
impl LmdbCacheManager {
×
174
    fn cache_options(&self, name: String) -> CacheOptions {
286✔
175
        CacheOptions {
286✔
176
            max_db_size: self.options.max_db_size,
286✔
177
            max_readers: self.options.max_readers,
286✔
178
            max_size: self.options.max_size,
286✔
179
            intersection_chunk_size: self.options.intersection_chunk_size,
286✔
180
            path: Some((self.base_path.clone(), name)),
286✔
181
        }
286✔
182
    }
286✔
183

184
    fn generate_unique_name(&self) -> String {
140✔
185
        uuid::Uuid::new_v4().to_string()
140✔
186
    }
140✔
187

188
    fn resolve_alias<'a>(
164✔
189
        &self,
164✔
190
        alias: &str,
164✔
191
        txn: &'a LmdbExclusiveTransaction,
164✔
192
    ) -> Result<Option<&'a str>, StorageError> {
164✔
193
        txn.get(self.alias_db, alias.as_bytes()).map(|bytes| {
164✔
194
            bytes.map(|bytes| {
164✔
195
                std::str::from_utf8(bytes).expect("Real names should always be utf8 string")
132✔
196
            })
164✔
197
        })
164✔
198
    }
164✔
199
}
×
200

×
201
#[cfg(test)]
×
202
mod tests {
×
203
    use super::*;
×
204

×
205
    #[test]
1✔
206
    fn test_lmdb_cache_manager() {
1✔
207
        let cache_manager = LmdbCacheManager::new(Default::default()).unwrap();
1✔
208
        let real_name = cache_manager
1✔
209
            .create_cache(Schema::empty(), vec![])
1✔
210
            .unwrap()
1✔
211
            .name()
1✔
212
            .to_string();
1✔
213
        // Test open with real name.
1✔
214
        assert_eq!(
1✔
215
            cache_manager
1✔
216
                .open_rw_cache(&real_name)
1✔
217
                .unwrap()
1✔
218
                .unwrap()
1✔
219
                .name(),
1✔
220
            real_name
1✔
221
        );
1✔
222
        assert_eq!(
1✔
223
            cache_manager
1✔
224
                .open_ro_cache(&real_name)
1✔
225
                .unwrap()
1✔
226
                .unwrap()
1✔
227
                .name(),
1✔
228
            real_name
1✔
229
        );
1✔
230
        // Test open with alias.
×
231
        let alias = "alias";
1✔
232
        cache_manager.create_alias(&real_name, alias).unwrap();
1✔
233
        assert_eq!(
1✔
234
            cache_manager.open_rw_cache(alias).unwrap().unwrap().name(),
1✔
235
            real_name
1✔
236
        );
1✔
237
        assert_eq!(
1✔
238
            cache_manager.open_ro_cache(alias).unwrap().unwrap().name(),
1✔
239
            real_name
1✔
240
        );
1✔
241
        // Test duplicate alias and real name.
×
242
        cache_manager.create_alias(&real_name, &real_name).unwrap();
1✔
243
        assert_eq!(
1✔
244
            cache_manager
1✔
245
                .open_rw_cache(&real_name)
1✔
246
                .unwrap()
1✔
247
                .unwrap()
1✔
248
                .name(),
1✔
249
            real_name
1✔
250
        );
1✔
251
        assert_eq!(
1✔
252
            cache_manager
1✔
253
                .open_ro_cache(&real_name)
1✔
254
                .unwrap()
1✔
255
                .unwrap()
1✔
256
                .name(),
1✔
257
            real_name
1✔
258
        );
1✔
259
        // If name is both alias and real name, alias shadows real name.
×
260
        let real_name2 = cache_manager
1✔
261
            .create_cache(Schema::empty(), vec![])
1✔
262
            .unwrap()
1✔
263
            .name()
1✔
264
            .to_string();
1✔
265
        cache_manager.create_alias(&real_name, &real_name2).unwrap();
1✔
266
        assert_eq!(
1✔
267
            cache_manager
1✔
268
                .open_rw_cache(&real_name)
1✔
269
                .unwrap()
1✔
270
                .unwrap()
1✔
271
                .name(),
1✔
272
            real_name
1✔
273
        );
1✔
274
        assert_eq!(
1✔
275
            cache_manager
1✔
276
                .open_ro_cache(&real_name)
1✔
277
                .unwrap()
1✔
278
                .unwrap()
1✔
279
                .name(),
1✔
280
            real_name
1✔
281
        );
1✔
282
    }
1✔
283
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc