• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4393407942

pending completion
4393407942

push

github

GitHub
fix: Lmdb environments should use the same map size (#1216)

55 of 55 new or added lines in 7 files covered. (100.0%)

28459 of 38371 relevant lines covered (74.17%)

85320.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.02
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::path::PathBuf;
2
use std::{fmt::Debug, sync::Arc};
3

4
use dozer_types::types::{Record, SchemaWithIndex};
5

6
use super::{
7
    super::{RoCache, RwCache},
8
    indexing::IndexingThreadPool,
9
};
10
use crate::cache::expression::QueryExpression;
11
use crate::cache::RecordWithId;
12
use crate::errors::CacheError;
13

14
mod main_environment;
15
mod query;
16
mod secondary_environment;
17

18
pub use main_environment::{MainEnvironment, RoMainEnvironment, RwMainEnvironment};
19
use query::LmdbQueryHandler;
20
pub use secondary_environment::{
21
    RoSecondaryEnvironment, RwSecondaryEnvironment, SecondaryEnvironment,
22
};
23

×
24
#[derive(Clone, Debug)]
×
25
pub struct CacheOptions {
26
    // Total number of readers allowed
27
    pub max_readers: u32,
28
    // Max no of dbs
29
    pub max_db_size: u32,
30
    // Total size allocated for data in a memory mapped file.
31
    // This size is allocated at initialization.
32
    pub max_size: usize,
33

34
    /// The chunk size when calculating intersection of index queries.
35
    pub intersection_chunk_size: usize,
36

37
    /// Provide a path where db will be created. If nothing is provided, will default to a temp location.
38
    /// Db path will be `PathBuf.join(String)`.
39
    pub path: Option<(PathBuf, String)>,
×
40
}
×
41

×
42
impl Default for CacheOptions {
×
43
    fn default() -> Self {
175✔
44
        Self {
175✔
45
            max_readers: 1000,
175✔
46
            max_db_size: 1000,
175✔
47
            max_size: 1024 * 1024 * 1024,
175✔
48
            intersection_chunk_size: 100,
175✔
49
            path: None,
175✔
50
        }
175✔
51
    }
175✔
52
}
53

54
#[derive(Debug)]
×
55
pub struct LmdbRoCache {
56
    main_env: RoMainEnvironment,
×
57
    secondary_envs: Vec<RoSecondaryEnvironment>,
×
58
}
×
59

×
60
impl LmdbRoCache {
×
61
    pub fn new(options: &CacheOptions) -> Result<Self, CacheError> {
143✔
62
        let main_env = RoMainEnvironment::new(options)?;
143✔
63
        let secondary_envs = (0..main_env.schema().1.len())
143✔
64
            .map(|index| RoSecondaryEnvironment::new(secondary_environment_name(index), options))
695✔
65
            .collect::<Result<_, _>>()?;
143✔
66
        Ok(Self {
143✔
67
            main_env,
143✔
68
            secondary_envs,
143✔
69
        })
143✔
70
    }
143✔
71
}
72

73
#[derive(Debug)]
×
74
pub struct LmdbRwCache {
75
    main_env: RwMainEnvironment,
76
    secondary_envs: Vec<RoSecondaryEnvironment>,
×
77
}
×
78

×
79
impl LmdbRwCache {
×
80
    pub fn new(
156✔
81
        schema: Option<&SchemaWithIndex>,
156✔
82
        options: &CacheOptions,
156✔
83
        indexing_thread_pool: &mut IndexingThreadPool,
156✔
84
    ) -> Result<Self, CacheError> {
156✔
85
        let rw_main_env = RwMainEnvironment::new(schema, options)?;
156✔
86

87
        let options = CacheOptions {
156✔
88
            path: Some((
156✔
89
                rw_main_env.base_path().to_path_buf(),
156✔
90
                rw_main_env.name().to_string(),
156✔
91
            )),
156✔
92
            ..*options
156✔
93
        };
156✔
94
        let ro_main_env = Arc::new(RoMainEnvironment::new(&options)?);
156✔
95

×
96
        let mut ro_secondary_envs = vec![];
156✔
97
        for (index, index_definition) in ro_main_env.schema().1.iter().enumerate() {
716✔
98
            let name = secondary_environment_name(index);
716✔
99
            let rw_secondary_env =
716✔
100
                RwSecondaryEnvironment::new(Some(index_definition), name.clone(), &options)?;
716✔
101
            indexing_thread_pool.add_indexing_task(ro_main_env.clone(), Arc::new(rw_secondary_env));
716✔
102

×
103
            let ro_secondary_env = RoSecondaryEnvironment::new(name, &options)?;
716✔
104
            ro_secondary_envs.push(ro_secondary_env);
716✔
105
        }
×
106

×
107
        Ok(Self {
156✔
108
            main_env: rw_main_env,
156✔
109
            secondary_envs: ro_secondary_envs,
156✔
110
        })
156✔
111
    }
156✔
112
}
×
113

×
114
impl<C: LmdbCache> RoCache for C {
×
115
    fn name(&self) -> &str {
166✔
116
        self.main_env().name()
166✔
117
    }
166✔
118

×
119
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
30✔
120
        self.main_env().get(key)
30✔
121
    }
30✔
122

×
123
    fn count(&self, query: &QueryExpression) -> Result<usize, CacheError> {
2,229✔
124
        LmdbQueryHandler::new(self, query).count()
2,229✔
125
    }
2,229✔
126

×
127
    fn query(&self, query: &QueryExpression) -> Result<Vec<RecordWithId>, CacheError> {
2,244✔
128
        LmdbQueryHandler::new(self, query).query()
2,244✔
129
    }
2,244✔
130

×
131
    fn get_schema(&self) -> &SchemaWithIndex {
178✔
132
        self.main_env().schema()
178✔
133
    }
178✔
134
}
×
135

136
impl RwCache for LmdbRwCache {
137
    fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
12,002✔
138
        let span = dozer_types::tracing::span!(dozer_types::tracing::Level::TRACE, "insert_cache");
24,004✔
139
        let _enter = span.enter();
12,002✔
140
        let record_id = self.main_env.insert(record)?;
12,002✔
141
        Ok(record_id)
12,002✔
142
    }
12,002✔
143

×
144
    fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
13✔
145
        let version = self.main_env.delete(key)?;
13✔
146
        Ok(version)
13✔
147
    }
13✔
148

×
149
    fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
7✔
150
        let version = self.delete(key)?;
7✔
151
        self.insert(record)?;
7✔
152
        Ok(version)
7✔
153
    }
7✔
154

×
155
    fn commit(&self) -> Result<(), CacheError> {
×
156
        self.main_env.commit()?;
162✔
157
        Ok(())
162✔
158
    }
162✔
159
}
160

×
161
pub trait LmdbCache: Send + Sync + Debug {
×
162
    type MainEnvironment: MainEnvironment;
×
163

×
164
    fn main_env(&self) -> &Self::MainEnvironment;
165

×
166
    type SecondaryEnvironment: SecondaryEnvironment;
×
167

168
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment;
169
}
170

×
171
impl LmdbCache for LmdbRoCache {
×
172
    type MainEnvironment = RoMainEnvironment;
173
    fn main_env(&self) -> &Self::MainEnvironment {
3,336,217✔
174
        &self.main_env
3,336,217✔
175
    }
3,336,217✔
176

×
177
    type SecondaryEnvironment = RoSecondaryEnvironment;
×
178
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment {
8,258✔
179
        &self.secondary_envs[index]
8,258✔
180
    }
8,258✔
181
}
182

×
183
impl LmdbCache for LmdbRwCache {
×
184
    type MainEnvironment = RwMainEnvironment;
×
185
    fn main_env(&self) -> &Self::MainEnvironment {
604✔
186
        &self.main_env
604✔
187
    }
604✔
188

189
    type SecondaryEnvironment = RoSecondaryEnvironment;
190
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment {
102✔
191
        &self.secondary_envs[index]
102✔
192
    }
102✔
193
}
194

×
195
fn secondary_environment_name(index: usize) -> String {
1,410✔
196
    format!("{index}")
1,410✔
197
}
1,410✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc