• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.58
/dozer-cache/src/cache/lmdb/cache_manager.rs
1
use std::path::PathBuf;
2

3
use dozer_storage::{
4
    errors::StorageError,
5
    lmdb::{Database, DatabaseFlags},
6
    lmdb_storage::{
7
        CreateDatabase, LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
8
    },
9
};
10
use dozer_types::types::{IndexDefinition, Schema};
11
use tempdir::TempDir;
12

13
use crate::{
14
    cache::{CacheManager, RoCache, RwCache},
15
    errors::CacheError,
16
};
17

18
use super::cache::{CacheCommonOptions, CacheWriteOptions, LmdbRoCache, LmdbRwCache};
×
19

20
#[derive(Debug, Clone)]
×
21
pub struct CacheManagerOptions {
22
    // Total number of readers allowed
23
    pub max_readers: u32,
24
    // Max no of dbs
25
    pub max_db_size: u32,
26

27
    /// The chunk size when calculating intersection of index queries.
28
    pub intersection_chunk_size: usize,
29

30
    // Total size allocated for data in a memory mapped file.
31
    // This size is allocated at initialization.
32
    pub max_size: usize,
33

34
    /// Provide a path where db will be created. If nothing is provided, will default to a temp directory.
35
    pub path: Option<PathBuf>,
36
}
37

×
38
impl Default for CacheManagerOptions {
×
39
    fn default() -> Self {
145✔
40
        let cache_common_options = CacheCommonOptions::default();
145✔
41
        let cache_write_options = CacheWriteOptions::default();
145✔
42
        Self {
145✔
43
            max_readers: cache_common_options.max_readers,
145✔
44
            max_db_size: cache_common_options.max_db_size,
145✔
45
            intersection_chunk_size: cache_common_options.intersection_chunk_size,
145✔
46
            max_size: cache_write_options.max_size,
145✔
47
            path: None,
145✔
48
        }
145✔
49
    }
145✔
50
}
×
51

52
#[derive(Debug)]
×
53
pub struct LmdbCacheManager {
54
    options: CacheManagerOptions,
55
    base_path: PathBuf,
56
    alias_db: Database,
57
    txn: SharedTransaction,
58
    _temp_dir: Option<TempDir>,
59
}
60

×
61
impl LmdbCacheManager {
×
62
    pub fn new(options: CacheManagerOptions) -> Result<Self, CacheError> {
157✔
63
        let (temp_dir, base_path) = match &options.path {
157✔
64
            Some(path) => {
36✔
65
                std::fs::create_dir_all(path).map_err(|e| CacheError::Io(path.clone(), e))?;
36✔
66
                (None, path.clone())
36✔
67
            }
×
68
            None => {
×
69
                let temp_dir = TempDir::new("dozer").expect("Unable to create temp dir");
121✔
70
                let base_path = temp_dir.path().to_path_buf();
121✔
71
                (Some(temp_dir), base_path)
121✔
72
            }
73
        };
×
74

×
75
        let mut env = LmdbEnvironmentManager::create(
157✔
76
            &base_path,
157✔
77
            LMDB_CACHE_MANAGER_ALIAS_ENV_NAME,
157✔
78
            Default::default(),
157✔
79
        )?;
157✔
80
        let alias_db = env.create_database(None, Some(DatabaseFlags::empty()))?;
157✔
81
        let txn = env.create_txn()?;
163✔
82

×
83
        Ok(Self {
163✔
84
            options,
163✔
85
            base_path,
163✔
86
            alias_db,
163✔
87
            txn,
163✔
88
            _temp_dir: temp_dir,
163✔
89
        })
163✔
90
    }
163✔
91
}
92

×
93
impl CacheManager for LmdbCacheManager {
×
94
    fn open_rw_cache(&self, name: &str) -> Result<Option<Box<dyn RwCache>>, CacheError> {
22✔
95
        let mut txn = self.txn.write();
22✔
96
        // Open a new transaction to make sure we get the latest changes.
22✔
97
        txn.commit_and_renew()?;
22✔
98
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
22✔
99
        let cache: Option<Box<dyn RwCache>> =
22✔
100
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
22✔
101
                let cache = LmdbRwCache::open(
4✔
102
                    &self.cache_common_options(real_name.to_string()),
4✔
103
                    self.cache_write_options(),
4✔
104
                )?;
4✔
105
                Some(Box::new(cache))
4✔
106
            } else {
107
                None
18✔
108
            };
×
109
        Ok(cache)
22✔
110
    }
22✔
111

×
112
    fn open_ro_cache(&self, name: &str) -> Result<Option<Box<dyn RoCache>>, CacheError> {
142✔
113
        let mut txn = self.txn.write();
142✔
114
        // Open a new transaction to make sure we get the latest changes.
142✔
115
        txn.commit_and_renew()?;
142✔
116
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
142✔
117
        let cache: Option<Box<dyn RoCache>> =
142✔
118
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
142✔
119
                let cache = LmdbRoCache::new(&self.cache_common_options(real_name.to_string()))?;
142✔
120
                Some(Box::new(cache))
142✔
121
            } else {
122
                None
×
123
            };
×
124
        Ok(cache)
142✔
125
    }
142✔
126

×
127
    fn create_cache(
140✔
128
        &self,
140✔
129
        schema: Schema,
140✔
130
        indexes: Vec<IndexDefinition>,
140✔
131
    ) -> Result<Box<dyn RwCache>, CacheError> {
140✔
132
        let name = self.generate_unique_name();
140✔
133
        let cache = LmdbRwCache::create(
140✔
134
            &(schema, indexes),
140✔
135
            &self.cache_common_options(name),
140✔
136
            self.cache_write_options(),
140✔
137
        )?;
140✔
138
        Ok(Box::new(cache))
140✔
139
    }
140✔
140

×
141
    fn create_alias(&self, name: &str, alias: &str) -> Result<(), CacheError> {
135✔
142
        let mut txn = self.txn.write();
135✔
143
        txn.put(self.alias_db, alias.as_bytes(), name.as_bytes())?;
135✔
144
        txn.commit_and_renew()?;
135✔
145
        Ok(())
135✔
146
    }
135✔
147
}
148

149
const LMDB_CACHE_MANAGER_ALIAS_ENV_NAME: &str = "__DOZER_CACHE_MANAGER_ALIAS__";
×
150

×
151
impl LmdbCacheManager {
×
152
    fn cache_common_options(&self, name: String) -> CacheCommonOptions {
286✔
153
        CacheCommonOptions {
286✔
154
            max_db_size: self.options.max_db_size,
286✔
155
            max_readers: self.options.max_readers,
286✔
156
            intersection_chunk_size: self.options.intersection_chunk_size,
286✔
157
            path: Some((self.base_path.clone(), name)),
286✔
158
        }
286✔
159
    }
286✔
160

×
161
    fn cache_write_options(&self) -> CacheWriteOptions {
144✔
162
        CacheWriteOptions {
144✔
163
            max_size: self.options.max_size,
144✔
164
        }
144✔
165
    }
144✔
166

×
167
    fn generate_unique_name(&self) -> String {
140✔
168
        uuid::Uuid::new_v4().to_string()
140✔
169
    }
140✔
170

×
171
    fn resolve_alias<'a>(
164✔
172
        &self,
164✔
173
        alias: &str,
164✔
174
        txn: &'a LmdbExclusiveTransaction,
164✔
175
    ) -> Result<Option<&'a str>, StorageError> {
164✔
176
        txn.get(self.alias_db, alias.as_bytes()).map(|bytes| {
164✔
177
            bytes.map(|bytes| {
164✔
178
                std::str::from_utf8(bytes).expect("Real names should always be utf8 string")
132✔
179
            })
164✔
180
        })
164✔
181
    }
164✔
182
}
183

184
#[cfg(test)]
185
mod tests {
×
186
    use super::*;
×
187

×
188
    #[test]
1✔
189
    fn test_lmdb_cache_manager() {
1✔
190
        let cache_manager = LmdbCacheManager::new(Default::default()).unwrap();
1✔
191
        let real_name = cache_manager
1✔
192
            .create_cache(Schema::empty(), vec![])
1✔
193
            .unwrap()
1✔
194
            .name()
1✔
195
            .to_string();
1✔
196
        // Test open with real name.
1✔
197
        assert_eq!(
1✔
198
            cache_manager
1✔
199
                .open_rw_cache(&real_name)
1✔
200
                .unwrap()
1✔
201
                .unwrap()
1✔
202
                .name(),
1✔
203
            real_name
1✔
204
        );
1✔
205
        assert_eq!(
1✔
206
            cache_manager
1✔
207
                .open_ro_cache(&real_name)
1✔
208
                .unwrap()
1✔
209
                .unwrap()
1✔
210
                .name(),
1✔
211
            real_name
1✔
212
        );
1✔
213
        // Test open with alias.
×
214
        let alias = "alias";
1✔
215
        cache_manager.create_alias(&real_name, alias).unwrap();
1✔
216
        assert_eq!(
1✔
217
            cache_manager.open_rw_cache(alias).unwrap().unwrap().name(),
1✔
218
            real_name
1✔
219
        );
1✔
220
        assert_eq!(
1✔
221
            cache_manager.open_ro_cache(alias).unwrap().unwrap().name(),
1✔
222
            real_name
1✔
223
        );
1✔
224
        // Test duplicate alias and real name.
×
225
        cache_manager.create_alias(&real_name, &real_name).unwrap();
1✔
226
        assert_eq!(
1✔
227
            cache_manager
1✔
228
                .open_rw_cache(&real_name)
1✔
229
                .unwrap()
1✔
230
                .unwrap()
1✔
231
                .name(),
1✔
232
            real_name
1✔
233
        );
1✔
234
        assert_eq!(
1✔
235
            cache_manager
1✔
236
                .open_ro_cache(&real_name)
1✔
237
                .unwrap()
1✔
238
                .unwrap()
1✔
239
                .name(),
1✔
240
            real_name
1✔
241
        );
1✔
242
        // If name is both alias and real name, alias shadows real name.
×
243
        let real_name2 = cache_manager
1✔
244
            .create_cache(Schema::empty(), vec![])
1✔
245
            .unwrap()
1✔
246
            .name()
1✔
247
            .to_string();
1✔
248
        cache_manager.create_alias(&real_name, &real_name2).unwrap();
1✔
249
        assert_eq!(
1✔
250
            cache_manager
1✔
251
                .open_rw_cache(&real_name)
1✔
252
                .unwrap()
1✔
253
                .unwrap()
1✔
254
                .name(),
1✔
255
            real_name
1✔
256
        );
1✔
257
        assert_eq!(
1✔
258
            cache_manager
1✔
259
                .open_ro_cache(&real_name)
1✔
260
                .unwrap()
1✔
261
                .unwrap()
1✔
262
                .name(),
1✔
263
            real_name
1✔
264
        );
1✔
265
    }
1✔
266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc