• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283045331

pending completion
4283045331

push

github

GitHub
feat: Support timestamp diff (#1074)

58 of 58 new or added lines in 2 files covered. (100.0%)

27146 of 37535 relevant lines covered (72.32%)

33460.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.86
/dozer-cache/src/cache/lmdb/cache_manager.rs
1
use std::path::PathBuf;
2

3
use dozer_storage::{
4
    errors::StorageError,
5
    lmdb::{Database, DatabaseFlags},
6
    lmdb_storage::{LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction},
7
};
8
use dozer_types::types::{IndexDefinition, Schema};
9
use tempdir::TempDir;
10

11
use crate::{
12
    cache::{CacheManager, RoCache, RwCache},
13
    errors::CacheError,
14
};
15

16
use super::cache::{CacheCommonOptions, CacheWriteOptions, LmdbRoCache, LmdbRwCache};
17

18
#[derive(Debug, Clone)]
20✔
19
pub struct CacheManagerOptions {
20
    // Total number of readers allowed
21
    pub max_readers: u32,
22
    // Max no of dbs
23
    pub max_db_size: u32,
24

25
    /// The chunk size when calculating intersection of index queries.
26
    pub intersection_chunk_size: usize,
27

28
    // Total size allocated for data in a memory mapped file.
29
    // This size is allocated at initialization.
30
    pub max_size: usize,
31

32
    /// Provide a path where db will be created. If nothing is provided, will default to a temp directory.
33
    pub path: Option<PathBuf>,
34
}
35

36
impl Default for CacheManagerOptions {
37
    fn default() -> Self {
133✔
38
        let cache_common_options = CacheCommonOptions::default();
133✔
39
        let cache_write_options = CacheWriteOptions::default();
133✔
40
        Self {
133✔
41
            max_readers: cache_common_options.max_readers,
133✔
42
            max_db_size: cache_common_options.max_db_size,
133✔
43
            intersection_chunk_size: cache_common_options.intersection_chunk_size,
133✔
44
            max_size: cache_write_options.max_size,
133✔
45
            path: None,
133✔
46
        }
133✔
47
    }
133✔
48
}
49

50
#[derive(Debug)]
×
51
pub struct LmdbCacheManager {
52
    options: CacheManagerOptions,
53
    base_path: PathBuf,
54
    alias_db: Database,
55
    txn: SharedTransaction,
56
    _temp_dir: Option<TempDir>,
57
}
58

59
impl LmdbCacheManager {
60
    pub fn new(options: CacheManagerOptions) -> Result<Self, CacheError> {
145✔
61
        let (temp_dir, base_path) = match &options.path {
145✔
62
            Some(path) => {
18✔
63
                std::fs::create_dir_all(path).map_err(|e| CacheError::Internal(Box::new(e)))?;
18✔
64
                (None, path.clone())
18✔
65
            }
66
            None => {
67
                let temp_dir = TempDir::new("dozer").expect("Unable to create temp dir");
127✔
68
                let base_path = temp_dir.path().to_path_buf();
127✔
69
                (Some(temp_dir), base_path)
127✔
70
            }
71
        };
72

73
        let mut env = LmdbEnvironmentManager::create(
145✔
74
            &base_path,
145✔
75
            LMDB_CACHE_MANAGER_ALIAS_ENV_NAME,
145✔
76
            Default::default(),
145✔
77
        )?;
145✔
78
        let alias_db = env.create_database(None, Some(DatabaseFlags::empty()))?;
145✔
79
        let txn = env.create_txn()?;
145✔
80

81
        Ok(Self {
145✔
82
            options,
145✔
83
            base_path,
145✔
84
            alias_db,
145✔
85
            txn,
145✔
86
            _temp_dir: temp_dir,
145✔
87
        })
145✔
88
    }
145✔
89
}
90

91
impl CacheManager for LmdbCacheManager {
92
    fn open_rw_cache(&self, name: &str) -> Result<Option<Box<dyn RwCache>>, CacheError> {
10✔
93
        let txn = self.txn.read();
10✔
94
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
10✔
95
        if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
10✔
96
            let cache = LmdbRwCache::open(
4✔
97
                self.cache_common_options(real_name.to_string()),
4✔
98
                self.cache_write_options(),
4✔
99
            )?;
4✔
100
            Ok(Some(Box::new(cache)))
4✔
101
        } else {
102
            Ok(None)
6✔
103
        }
104
    }
10✔
105

106
    fn open_ro_cache(&self, name: &str) -> Result<Option<Box<dyn RoCache>>, CacheError> {
136✔
107
        let txn = self.txn.read();
136✔
108
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
136✔
109
        if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
136✔
110
            let cache = LmdbRoCache::new(self.cache_common_options(real_name.to_string()))?;
136✔
111
            Ok(Some(Box::new(cache)))
136✔
112
        } else {
113
            Ok(None)
×
114
        }
115
    }
136✔
116

117
    fn create_cache(
134✔
118
        &self,
134✔
119
        schemas: Vec<(String, Schema, Vec<IndexDefinition>)>,
134✔
120
    ) -> Result<Box<dyn RwCache>, CacheError> {
134✔
121
        let name = self.generate_unique_name();
134✔
122
        let cache = LmdbRwCache::create(
134✔
123
            schemas,
134✔
124
            self.cache_common_options(name),
134✔
125
            self.cache_write_options(),
134✔
126
        )?;
134✔
127
        Ok(Box::new(cache))
134✔
128
    }
134✔
129

130
    fn create_alias(&self, name: &str, alias: &str) -> Result<(), CacheError> {
123✔
131
        let mut txn = self.txn.write();
123✔
132
        txn.put(self.alias_db, alias.as_bytes(), name.as_bytes())?;
123✔
133
        txn.commit_and_renew()?;
123✔
134
        Ok(())
123✔
135
    }
123✔
136
}
137

138
const LMDB_CACHE_MANAGER_ALIAS_ENV_NAME: &str = "__DOZER_CACHE_MANAGER_ALIAS__";
139

140
impl LmdbCacheManager {
141
    fn cache_common_options(&self, name: String) -> CacheCommonOptions {
274✔
142
        CacheCommonOptions {
274✔
143
            max_db_size: self.options.max_db_size,
274✔
144
            max_readers: self.options.max_readers,
274✔
145
            intersection_chunk_size: self.options.intersection_chunk_size,
274✔
146
            path: Some((self.base_path.clone(), name)),
274✔
147
        }
274✔
148
    }
274✔
149

150
    fn cache_write_options(&self) -> CacheWriteOptions {
138✔
151
        CacheWriteOptions {
138✔
152
            max_size: self.options.max_size,
138✔
153
        }
138✔
154
    }
138✔
155

156
    fn generate_unique_name(&self) -> String {
134✔
157
        uuid::Uuid::new_v4().to_string()
134✔
158
    }
134✔
159

160
    fn resolve_alias<'a>(
146✔
161
        &self,
146✔
162
        alias: &str,
146✔
163
        txn: &'a LmdbExclusiveTransaction,
146✔
164
    ) -> Result<Option<&'a str>, StorageError> {
146✔
165
        txn.get(self.alias_db, alias.as_bytes()).map(|bytes| {
146✔
166
            bytes.map(|bytes| {
146✔
167
                std::str::from_utf8(bytes).expect("Real names should always be utf8 string")
126✔
168
            })
146✔
169
        })
146✔
170
    }
146✔
171
}
172

173
#[cfg(test)]
174
mod tests {
175
    use super::*;
176

177
    #[test]
1✔
178
    fn test_lmdb_cache_manager() {
1✔
179
        let cache_manager = LmdbCacheManager::new(Default::default()).unwrap();
1✔
180
        let real_name = cache_manager
1✔
181
            .create_cache(vec![])
1✔
182
            .unwrap()
1✔
183
            .name()
1✔
184
            .to_string();
1✔
185
        // Test open with real name.
1✔
186
        assert_eq!(
1✔
187
            cache_manager
1✔
188
                .open_rw_cache(&real_name)
1✔
189
                .unwrap()
1✔
190
                .unwrap()
1✔
191
                .name(),
1✔
192
            real_name
1✔
193
        );
1✔
194
        assert_eq!(
1✔
195
            cache_manager
1✔
196
                .open_ro_cache(&real_name)
1✔
197
                .unwrap()
1✔
198
                .unwrap()
1✔
199
                .name(),
1✔
200
            real_name
1✔
201
        );
1✔
202
        // Test open with alias.
203
        let alias = "alias";
1✔
204
        cache_manager.create_alias(&real_name, alias).unwrap();
1✔
205
        assert_eq!(
1✔
206
            cache_manager.open_rw_cache(alias).unwrap().unwrap().name(),
1✔
207
            real_name
1✔
208
        );
1✔
209
        assert_eq!(
1✔
210
            cache_manager.open_ro_cache(alias).unwrap().unwrap().name(),
1✔
211
            real_name
1✔
212
        );
1✔
213
        // Test duplicate alias and real name.
214
        cache_manager.create_alias(&real_name, &real_name).unwrap();
1✔
215
        assert_eq!(
1✔
216
            cache_manager
1✔
217
                .open_rw_cache(&real_name)
1✔
218
                .unwrap()
1✔
219
                .unwrap()
1✔
220
                .name(),
1✔
221
            real_name
1✔
222
        );
1✔
223
        assert_eq!(
1✔
224
            cache_manager
1✔
225
                .open_ro_cache(&real_name)
1✔
226
                .unwrap()
1✔
227
                .unwrap()
1✔
228
                .name(),
1✔
229
            real_name
1✔
230
        );
1✔
231
        // If name is both alias and real name, alias shadows real name.
232
        let real_name2 = cache_manager
1✔
233
            .create_cache(vec![])
1✔
234
            .unwrap()
1✔
235
            .name()
1✔
236
            .to_string();
1✔
237
        cache_manager.create_alias(&real_name, &real_name2).unwrap();
1✔
238
        assert_eq!(
1✔
239
            cache_manager
1✔
240
                .open_rw_cache(&real_name)
1✔
241
                .unwrap()
1✔
242
                .unwrap()
1✔
243
                .name(),
1✔
244
            real_name
1✔
245
        );
1✔
246
        assert_eq!(
1✔
247
            cache_manager
1✔
248
                .open_ro_cache(&real_name)
1✔
249
                .unwrap()
1✔
250
                .unwrap()
1✔
251
                .name(),
1✔
252
            real_name
1✔
253
        );
1✔
254
    }
1✔
255
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc