• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4284179721

pending completion
4284179721

push

github

GitHub
fix: select * wildcard (#1080)

27683 of 39180 relevant lines covered (70.66%)

52493.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.18
/dozer-cache/src/cache/lmdb/cache_manager.rs
1
use std::path::PathBuf;
2

3
use dozer_storage::{
4
    errors::StorageError,
5
    lmdb::{Database, DatabaseFlags},
6
    lmdb_storage::{LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction},
7
};
8
use dozer_types::types::{IndexDefinition, Schema};
9
use tempdir::TempDir;
10

11
use crate::{
12
    cache::{CacheManager, RoCache, RwCache},
13
    errors::CacheError,
14
};
15

16
use super::cache::{CacheCommonOptions, CacheWriteOptions, LmdbRoCache, LmdbRwCache};
17

18
#[derive(Debug, Clone)]
20✔
19
pub struct CacheManagerOptions {
20
    // Total number of readers allowed
21
    pub max_readers: u32,
22
    // Max no of dbs
23
    pub max_db_size: u32,
24

25
    /// The chunk size when calculating intersection of index queries.
26
    pub intersection_chunk_size: usize,
27

28
    // Total size allocated for data in a memory mapped file.
29
    // This size is allocated at initialization.
30
    pub max_size: usize,
31

32
    /// Provide a path where db will be created. If nothing is provided, will default to a temp directory.
33
    pub path: Option<PathBuf>,
34
}
35

36
impl Default for CacheManagerOptions {
37
    fn default() -> Self {
133✔
38
        let cache_common_options = CacheCommonOptions::default();
133✔
39
        let cache_write_options = CacheWriteOptions::default();
133✔
40
        Self {
133✔
41
            max_readers: cache_common_options.max_readers,
133✔
42
            max_db_size: cache_common_options.max_db_size,
133✔
43
            intersection_chunk_size: cache_common_options.intersection_chunk_size,
133✔
44
            max_size: cache_write_options.max_size,
133✔
45
            path: None,
133✔
46
        }
133✔
47
    }
133✔
48
}
49

50
#[derive(Debug)]
×
51
pub struct LmdbCacheManager {
52
    options: CacheManagerOptions,
53
    base_path: PathBuf,
54
    alias_db: Database,
55
    txn: SharedTransaction,
56
    _temp_dir: Option<TempDir>,
57
}
58

59
impl LmdbCacheManager {
60
    pub fn new(options: CacheManagerOptions) -> Result<Self, CacheError> {
145✔
61
        let (temp_dir, base_path) = match &options.path {
145✔
62
            Some(path) => {
18✔
63
                std::fs::create_dir_all(path).map_err(|e| CacheError::Internal(Box::new(e)))?;
18✔
64
                (None, path.clone())
18✔
65
            }
66
            None => {
67
                let temp_dir = TempDir::new("dozer").expect("Unable to create temp dir");
127✔
68
                let base_path = temp_dir.path().to_path_buf();
127✔
69
                (Some(temp_dir), base_path)
127✔
70
            }
71
        };
72

73
        let mut env = LmdbEnvironmentManager::create(
145✔
74
            &base_path,
145✔
75
            LMDB_CACHE_MANAGER_ALIAS_ENV_NAME,
145✔
76
            Default::default(),
145✔
77
        )?;
145✔
78
        let alias_db = env.create_database(None, Some(DatabaseFlags::empty()))?;
145✔
79
        let txn = env.create_txn()?;
145✔
80

81
        Ok(Self {
145✔
82
            options,
145✔
83
            base_path,
145✔
84
            alias_db,
145✔
85
            txn,
145✔
86
            _temp_dir: temp_dir,
145✔
87
        })
145✔
88
    }
145✔
89
}
90

91
impl CacheManager for LmdbCacheManager {
92
    fn open_rw_cache(&self, name: &str) -> Result<Option<Box<dyn RwCache>>, CacheError> {
16✔
93
        let mut txn = self.txn.write();
16✔
94
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
16✔
95
        let cache: Option<Box<dyn RwCache>> =
16✔
96
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
16✔
97
                let cache = LmdbRwCache::open(
4✔
98
                    self.cache_common_options(real_name.to_string()),
4✔
99
                    self.cache_write_options(),
4✔
100
                )?;
4✔
101
                Some(Box::new(cache))
4✔
102
            } else {
×
103
                None
12✔
104
            };
×
105
        txn.commit_and_renew()?;
16✔
106
        Ok(cache)
16✔
107
    }
16✔
108

×
109
    fn open_ro_cache(&self, name: &str) -> Result<Option<Box<dyn RoCache>>, CacheError> {
136✔
110
        let mut txn = self.txn.write();
136✔
111
        let real_name = self.resolve_alias(name, &txn)?.unwrap_or(name);
136✔
112
        let cache: Option<Box<dyn RoCache>> =
136✔
113
            if LmdbEnvironmentManager::exists(&self.base_path, real_name) {
136✔
114
                let cache = LmdbRoCache::new(self.cache_common_options(real_name.to_string()))?;
136✔
115
                Some(Box::new(cache))
136✔
116
            } else {
117
                None
×
118
            };
×
119
        txn.commit_and_renew()?;
136✔
120
        Ok(cache)
136✔
121
    }
136✔
122

×
123
    fn create_cache(
134✔
124
        &self,
134✔
125
        schemas: Vec<(String, Schema, Vec<IndexDefinition>)>,
134✔
126
    ) -> Result<Box<dyn RwCache>, CacheError> {
134✔
127
        let name = self.generate_unique_name();
134✔
128
        let cache = LmdbRwCache::create(
134✔
129
            schemas,
134✔
130
            self.cache_common_options(name),
134✔
131
            self.cache_write_options(),
134✔
132
        )?;
134✔
133
        Ok(Box::new(cache))
134✔
134
    }
134✔
135

×
136
    fn create_alias(&self, name: &str, alias: &str) -> Result<(), CacheError> {
129✔
137
        let mut txn = self.txn.write();
129✔
138
        txn.put(self.alias_db, alias.as_bytes(), name.as_bytes())?;
129✔
139
        txn.commit_and_renew()?;
129✔
140
        Ok(())
129✔
141
    }
129✔
142
}
×
143

×
144
const LMDB_CACHE_MANAGER_ALIAS_ENV_NAME: &str = "__DOZER_CACHE_MANAGER_ALIAS__";
×
145

×
146
impl LmdbCacheManager {
×
147
    fn cache_common_options(&self, name: String) -> CacheCommonOptions {
274✔
148
        CacheCommonOptions {
274✔
149
            max_db_size: self.options.max_db_size,
274✔
150
            max_readers: self.options.max_readers,
274✔
151
            intersection_chunk_size: self.options.intersection_chunk_size,
274✔
152
            path: Some((self.base_path.clone(), name)),
274✔
153
        }
274✔
154
    }
274✔
155

156
    fn cache_write_options(&self) -> CacheWriteOptions {
138✔
157
        CacheWriteOptions {
138✔
158
            max_size: self.options.max_size,
138✔
159
        }
138✔
160
    }
138✔
161

×
162
    fn generate_unique_name(&self) -> String {
134✔
163
        uuid::Uuid::new_v4().to_string()
134✔
164
    }
134✔
165

×
166
    fn resolve_alias<'a>(
152✔
167
        &self,
152✔
168
        alias: &str,
152✔
169
        txn: &'a LmdbExclusiveTransaction,
152✔
170
    ) -> Result<Option<&'a str>, StorageError> {
152✔
171
        txn.get(self.alias_db, alias.as_bytes()).map(|bytes| {
152✔
172
            bytes.map(|bytes| {
152✔
173
                std::str::from_utf8(bytes).expect("Real names should always be utf8 string")
126✔
174
            })
152✔
175
        })
152✔
176
    }
152✔
177
}
×
178

×
179
#[cfg(test)]
×
180
mod tests {
×
181
    use super::*;
×
182

×
183
    #[test]
1✔
184
    fn test_lmdb_cache_manager() {
1✔
185
        let cache_manager = LmdbCacheManager::new(Default::default()).unwrap();
1✔
186
        let real_name = cache_manager
1✔
187
            .create_cache(vec![])
1✔
188
            .unwrap()
1✔
189
            .name()
1✔
190
            .to_string();
1✔
191
        // Test open with real name.
1✔
192
        assert_eq!(
1✔
193
            cache_manager
1✔
194
                .open_rw_cache(&real_name)
1✔
195
                .unwrap()
1✔
196
                .unwrap()
1✔
197
                .name(),
1✔
198
            real_name
1✔
199
        );
1✔
200
        assert_eq!(
1✔
201
            cache_manager
1✔
202
                .open_ro_cache(&real_name)
1✔
203
                .unwrap()
1✔
204
                .unwrap()
1✔
205
                .name(),
1✔
206
            real_name
1✔
207
        );
1✔
208
        // Test open with alias.
×
209
        let alias = "alias";
1✔
210
        cache_manager.create_alias(&real_name, alias).unwrap();
1✔
211
        assert_eq!(
1✔
212
            cache_manager.open_rw_cache(alias).unwrap().unwrap().name(),
1✔
213
            real_name
1✔
214
        );
1✔
215
        assert_eq!(
1✔
216
            cache_manager.open_ro_cache(alias).unwrap().unwrap().name(),
1✔
217
            real_name
1✔
218
        );
1✔
219
        // Test duplicate alias and real name.
×
220
        cache_manager.create_alias(&real_name, &real_name).unwrap();
1✔
221
        assert_eq!(
1✔
222
            cache_manager
1✔
223
                .open_rw_cache(&real_name)
1✔
224
                .unwrap()
1✔
225
                .unwrap()
1✔
226
                .name(),
1✔
227
            real_name
1✔
228
        );
1✔
229
        assert_eq!(
1✔
230
            cache_manager
1✔
231
                .open_ro_cache(&real_name)
1✔
232
                .unwrap()
1✔
233
                .unwrap()
1✔
234
                .name(),
1✔
235
            real_name
1✔
236
        );
1✔
237
        // If name is both alias and real name, alias shadows real name.
×
238
        let real_name2 = cache_manager
1✔
239
            .create_cache(vec![])
1✔
240
            .unwrap()
1✔
241
            .name()
1✔
242
            .to_string();
1✔
243
        cache_manager.create_alias(&real_name, &real_name2).unwrap();
1✔
244
        assert_eq!(
1✔
245
            cache_manager
1✔
246
                .open_rw_cache(&real_name)
1✔
247
                .unwrap()
1✔
248
                .unwrap()
1✔
249
                .name(),
1✔
250
            real_name
1✔
251
        );
1✔
252
        assert_eq!(
1✔
253
            cache_manager
1✔
254
                .open_ro_cache(&real_name)
1✔
255
                .unwrap()
1✔
256
                .unwrap()
1✔
257
                .name(),
1✔
258
            real_name
1✔
259
        );
1✔
260
    }
1✔
261
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc