• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.54
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::fmt::Debug;
2
use std::path::PathBuf;
3

4
use dozer_storage::BeginTransaction;
5

6
use dozer_types::types::{Record, SchemaWithIndex};
7

8
use super::super::{RoCache, RwCache};
9
use crate::cache::expression::QueryExpression;
10
use crate::cache::RecordWithId;
11
use crate::errors::CacheError;
12

13
mod helper;
14
mod main_environment;
15
mod query;
16
mod secondary_environment;
17

18
use main_environment::{MainEnvironment, RoMainEnvironment, RwMainEnvironment};
19
use query::LmdbQueryHandler;
20
pub use secondary_environment::SecondaryEnvironment;
21
use secondary_environment::{RoSecondaryEnvironment, RwSecondaryEnvironment};
22

23
#[derive(Clone, Debug)]
×
24
pub struct CacheCommonOptions {
25
    // Total number of readers allowed
26
    pub max_readers: u32,
27
    // Max no of dbs
28
    pub max_db_size: u32,
29

30
    /// The chunk size when calculating intersection of index queries.
31
    pub intersection_chunk_size: usize,
32

33
    /// Provide a path where db will be created. If nothing is provided, will default to a temp location.
34
    /// Db path will be `PathBuf.join(String)`.
35
    pub path: Option<(PathBuf, String)>,
36
}
37

×
38
impl Default for CacheCommonOptions {
39
    fn default() -> Self {
175✔
40
        Self {
175✔
41
            max_readers: 1000,
175✔
42
            max_db_size: 1000,
175✔
43
            intersection_chunk_size: 100,
175✔
44
            path: None,
175✔
45
        }
175✔
46
    }
175✔
47
}
48

49
#[derive(Debug)]
×
50
pub struct LmdbRoCache {
51
    main_env: RoMainEnvironment,
52
    secondary_envs: Vec<RoSecondaryEnvironment>,
53
}
×
54

×
55
impl LmdbRoCache {
×
56
    pub fn new(options: &CacheCommonOptions) -> Result<Self, CacheError> {
143✔
57
        let main_env = RoMainEnvironment::new(options)?;
143✔
58
        let secondary_envs = (0..main_env.schema().1.len())
143✔
59
            .map(|index| RoSecondaryEnvironment::new(secondary_environment_name(index), options))
695✔
60
            .collect::<Result<_, _>>()?;
143✔
61
        Ok(Self {
143✔
62
            main_env,
143✔
63
            secondary_envs,
143✔
64
        })
143✔
65
    }
143✔
66
}
67

68
#[derive(Debug, Clone, Copy)]
×
69
pub struct CacheWriteOptions {
70
    // Total size allocated for data in a memory mapped file.
×
71
    // This size is allocated at initialization.
×
72
    pub max_size: usize,
×
73
}
×
74

×
75
impl Default for CacheWriteOptions {
×
76
    fn default() -> Self {
174✔
77
        Self {
174✔
78
            max_size: 1024 * 1024 * 1024 * 1024,
174✔
79
        }
174✔
80
    }
174✔
81
}
×
82

×
83
#[derive(Debug)]
×
84
pub struct LmdbRwCache {
85
    main_env: RwMainEnvironment,
×
86
    secondary_envs: Vec<RwSecondaryEnvironment>,
87
}
88

×
89
impl LmdbRwCache {
×
90
    pub fn open(
4✔
91
        common_options: &CacheCommonOptions,
4✔
92
        write_options: CacheWriteOptions,
4✔
93
    ) -> Result<Self, CacheError> {
4✔
94
        let main_env = RwMainEnvironment::open(common_options, write_options)?;
4✔
95
        let secondary_envs = (0..main_env.schema().1.len())
4✔
96
            .map(|index| {
4✔
97
                RwSecondaryEnvironment::open(
×
98
                    secondary_environment_name(index),
×
99
                    common_options,
×
100
                    write_options,
×
101
                )
×
102
            })
4✔
103
            .collect::<Result<_, _>>()?;
4✔
104
        Ok(Self {
4✔
105
            main_env,
4✔
106
            secondary_envs,
4✔
107
        })
4✔
108
    }
4✔
109

×
110
    pub fn create(
152✔
111
        schema: &SchemaWithIndex,
152✔
112
        common_options: &CacheCommonOptions,
152✔
113
        write_options: CacheWriteOptions,
152✔
114
    ) -> Result<Self, CacheError> {
152✔
115
        let main_env = RwMainEnvironment::create(schema, common_options, write_options)?;
152✔
116
        let secondary_envs = main_env
152✔
117
            .schema()
152✔
118
            .1
152✔
119
            .iter()
152✔
120
            .enumerate()
152✔
121
            .map(|(index, index_definition)| {
717✔
122
                RwSecondaryEnvironment::create(
717✔
123
                    index_definition,
717✔
124
                    secondary_environment_name(index),
717✔
125
                    common_options,
717✔
126
                    write_options,
717✔
127
                )
717✔
128
            })
717✔
129
            .collect::<Result<_, _>>()?;
152✔
130
        Ok(Self {
152✔
131
            main_env,
152✔
132
            secondary_envs,
152✔
133
        })
152✔
134
    }
152✔
135
}
×
136

×
137
impl<C: LmdbCache> RoCache for C {
×
138
    fn name(&self) -> &str {
166✔
139
        self.main_env().name()
166✔
140
    }
166✔
141

×
142
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
30✔
143
        self.main_env().get(key)
30✔
144
    }
30✔
145

×
146
    fn count(&self, query: &QueryExpression) -> Result<usize, CacheError> {
2,229✔
147
        LmdbQueryHandler::new(self, query).count()
2,229✔
148
    }
2,229✔
149

×
150
    fn query(&self, query: &QueryExpression) -> Result<Vec<RecordWithId>, CacheError> {
2,244✔
151
        LmdbQueryHandler::new(self, query).query()
2,244✔
152
    }
2,244✔
153

×
154
    fn get_schema(&self) -> &SchemaWithIndex {
178✔
155
        self.main_env().schema()
178✔
156
    }
178✔
157
}
×
158

×
159
impl RwCache for LmdbRwCache {
×
160
    fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
11,360✔
161
        let span = dozer_types::tracing::span!(dozer_types::tracing::Level::TRACE, "insert_cache");
22,720✔
162
        let _enter = span.enter();
11,762✔
163
        let record_id = self.main_env.insert(record)?;
11,762✔
164

×
165
        let span = dozer_types::tracing::span!(
11,480✔
166
            dozer_types::tracing::Level::TRACE,
22,666✔
167
            "build_indexes",
×
168
            record_id = record_id,
×
169
        );
×
170
        let _enter = span.enter();
11,480✔
171
        self.index()?;
11,480✔
172

×
173
        Ok(record_id)
11,744✔
174
    }
11,744✔
175

×
176
    fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
13✔
177
        let version = self.main_env.delete(key)?;
13✔
178
        self.index()?;
13✔
179
        Ok(version)
13✔
180
    }
13✔
181

×
182
    fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
7✔
183
        let version = self.delete(key)?;
7✔
184
        self.insert(record)?;
7✔
185
        self.index()?;
7✔
186
        Ok(version)
7✔
187
    }
7✔
188

×
189
    fn commit(&self) -> Result<(), CacheError> {
×
190
        self.main_env.commit()?;
139✔
191
        for secondary_env in &self.secondary_envs {
845✔
192
            secondary_env.commit()?;
700✔
193
        }
×
194
        Ok(())
145✔
195
    }
145✔
196
}
×
197

×
198
impl LmdbRwCache {
×
199
    fn index(&self) -> Result<(), CacheError> {
11,410✔
200
        let main_txn = self.main_env.begin_txn()?;
11,410✔
201
        for secondary_env in &self.secondary_envs {
91,803✔
202
            secondary_env.index(&main_txn, self.main_env.operation_log())?;
80,429✔
203
        }
×
204
        Ok(())
11,374✔
205
    }
11,374✔
206
}
×
207

×
208
pub trait LmdbCache: Send + Sync + Debug {
×
209
    type MainEnvironment: MainEnvironment;
×
210

×
211
    fn main_env(&self) -> &Self::MainEnvironment;
×
212

×
213
    type SecondaryEnvironment: SecondaryEnvironment;
×
214

×
215
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment;
×
216
}
×
217

×
218
impl LmdbCache for LmdbRoCache {
×
219
    type MainEnvironment = RoMainEnvironment;
220
    fn main_env(&self) -> &Self::MainEnvironment {
3,336,217✔
221
        &self.main_env
3,336,217✔
222
    }
3,336,217✔
223

×
224
    type SecondaryEnvironment = RoSecondaryEnvironment;
×
225
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment {
8,258✔
226
        &self.secondary_envs[index]
8,258✔
227
    }
8,258✔
228
}
×
229

×
230
impl LmdbCache for LmdbRwCache {
×
231
    type MainEnvironment = RwMainEnvironment;
232
    fn main_env(&self) -> &Self::MainEnvironment {
604✔
233
        &self.main_env
604✔
234
    }
604✔
235

×
236
    type SecondaryEnvironment = RwSecondaryEnvironment;
×
237
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment {
102✔
238
        &self.secondary_envs[index]
102✔
239
    }
102✔
240
}
×
241

×
242
fn secondary_environment_name(index: usize) -> String {
1,411✔
243
    format!("{index}")
1,411✔
244
}
1,411✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc