• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4393407942

pending completion
4393407942

push

github

GitHub
fix: Lmdb environments should use the same map size (#1216)

55 of 55 new or added lines in 7 files covered. (100.0%)

28459 of 38371 relevant lines covered (74.17%)

85320.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.44
/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
1
use std::path::{Path, PathBuf};
2

3
use dozer_storage::{
4
    errors::StorageError,
5
    lmdb::RoTransaction,
6
    lmdb_storage::{LmdbEnvironmentManager, SharedTransaction},
7
    BeginTransaction, LmdbOption, ReadTransaction,
8
};
9
use dozer_types::{
10
    borrow::IntoOwned,
11
    types::{Field, FieldType, Record, Schema, SchemaWithIndex},
12
};
13
use tempdir::TempDir;
14

15
use crate::{
16
    cache::{index, lmdb::utils::init_env, RecordWithId},
17
    errors::CacheError,
18
};
19

20
mod operation_log;
21

22
pub use operation_log::{Operation, OperationLog};
23

24
use super::CacheOptions;
25

26
pub trait MainEnvironment: BeginTransaction {
27
    fn common(&self) -> &MainEnvironmentCommon;
28

×
29
    fn schema(&self) -> &SchemaWithIndex;
×
30

×
31
    fn base_path(&self) -> &Path {
150✔
32
        &self.common().base_path
150✔
33
    }
150✔
34

×
35
    fn name(&self) -> &str {
310✔
36
        &self.common().name
310✔
37
    }
310✔
38

×
39
    fn operation_log(&self) -> OperationLog {
10,467,340✔
40
        self.common().operation_log
10,467,340✔
41
    }
10,467,340✔
42

×
43
    fn intersection_chunk_size(&self) -> usize {
2✔
44
        self.common().intersection_chunk_size
2✔
45
    }
2✔
46

47
    fn count(&self) -> Result<usize, CacheError> {
136✔
48
        let txn = self.begin_txn()?;
136✔
49
        self.operation_log()
136✔
50
            .count_present_records(&txn, self.schema().0.is_append_only())
136✔
51
            .map_err(Into::into)
136✔
52
    }
136✔
53

54
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
30✔
55
        let txn = self.begin_txn()?;
30✔
56
        self.operation_log()
30✔
57
            .get_record(&txn, key)?
30✔
58
            .ok_or(CacheError::PrimaryKeyNotFound)
30✔
59
    }
30✔
60
}
61

62
#[derive(Debug)]
×
63
pub struct MainEnvironmentCommon {
64
    /// The environment base path.
×
65
    base_path: PathBuf,
66
    /// The environment name.
67
    name: String,
68
    /// The operation log.
69
    operation_log: OperationLog,
70
    intersection_chunk_size: usize,
71
}
72

73
#[derive(Debug)]
×
74
pub struct RwMainEnvironment {
×
75
    txn: SharedTransaction,
×
76
    common: MainEnvironmentCommon,
×
77
    _temp_dir: Option<TempDir>,
78
    schema: SchemaWithIndex,
79
}
80

×
81
impl BeginTransaction for RwMainEnvironment {
×
82
    type Transaction<'a> = ReadTransaction<'a>;
×
83

84
    fn begin_txn(&self) -> Result<Self::Transaction<'_>, StorageError> {
72✔
85
        self.txn.begin_txn()
72✔
86
    }
72✔
87
}
88

89
impl MainEnvironment for RwMainEnvironment {
90
    fn common(&self) -> &MainEnvironmentCommon {
678✔
91
        &self.common
678✔
92
    }
678✔
93

×
94
    fn schema(&self) -> &SchemaWithIndex {
194✔
95
        &self.schema
194✔
96
    }
194✔
97
}
×
98

×
99
impl RwMainEnvironment {
×
100
    pub fn new(
150✔
101
        schema: Option<&SchemaWithIndex>,
150✔
102
        options: &CacheOptions,
150✔
103
    ) -> Result<Self, CacheError> {
150✔
104
        let (env, common, schema_option, old_schema, temp_dir) = open_env(options, true)?;
150✔
105
        let txn = env.create_txn()?;
150✔
106

×
107
        let schema = match (schema, old_schema) {
156✔
108
            (Some(schema), Some(old_schema)) => {
×
109
                if &old_schema != schema {
×
110
                    return Err(CacheError::SchemaMismatch {
×
111
                        name: common.name,
×
112
                        given: Box::new(schema.clone()),
×
113
                        stored: Box::new(old_schema),
×
114
                    });
×
115
                }
×
116
                old_schema
×
117
            }
×
118
            (Some(schema), None) => {
152✔
119
                let mut txn = txn.write();
152✔
120
                schema_option.store(txn.txn_mut(), schema)?;
152✔
121
                txn.commit_and_renew()?;
152✔
122
                schema.clone()
152✔
123
            }
×
124
            (None, Some(schema)) => schema,
4✔
125
            (None, None) => return Err(CacheError::SchemaNotFound),
×
126
        };
127

128
        Ok(Self {
156✔
129
            txn,
156✔
130
            common,
156✔
131
            schema,
156✔
132
            _temp_dir: temp_dir,
156✔
133
        })
156✔
134
    }
156✔
135

136
    /// Inserts the record into the cache and sets the record version. Returns the record id.
137
    ///
138
    /// Every time a record with the same primary key is inserted, its version number gets increased by 1.
×
139
    pub fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
12,002✔
140
        debug_check_schema_record_consistency(&self.schema.0, record);
12,002✔
141

×
142
        let primary_key = if self.schema.0.is_append_only() {
12,002✔
143
            None
31✔
144
        } else {
×
145
            Some(index::get_primary_key(
11,971✔
146
                &self.schema.0.primary_index,
11,971✔
147
                &record.values,
11,971✔
148
            ))
11,971✔
149
        };
150

×
151
        let mut txn = self.txn.write();
12,002✔
152
        let txn = txn.txn_mut();
12,002✔
153
        self.common
12,002✔
154
            .operation_log
12,002✔
155
            .insert(txn, record, primary_key.as_deref())?
12,002✔
156
            .ok_or(CacheError::PrimaryKeyExists)
12,002✔
157
    }
12,002✔
158

159
    /// Deletes the record and returns the record version.
×
160
    pub fn delete(&self, primary_key: &[u8]) -> Result<u32, CacheError> {
13✔
161
        let mut txn = self.txn.write();
13✔
162
        let txn = txn.txn_mut();
13✔
163
        self.common
13✔
164
            .operation_log
13✔
165
            .delete(txn, primary_key)?
13✔
166
            .ok_or(CacheError::PrimaryKeyNotFound)
13✔
167
    }
13✔
168

×
169
    pub fn commit(&self) -> Result<(), CacheError> {
162✔
170
        self.txn.write().commit_and_renew().map_err(Into::into)
162✔
171
    }
162✔
172
}
173

×
174
#[derive(Debug)]
×
175
pub struct RoMainEnvironment {
176
    env: LmdbEnvironmentManager,
177
    common: MainEnvironmentCommon,
178
    schema: SchemaWithIndex,
179
}
180

181
impl BeginTransaction for RoMainEnvironment {
182
    type Transaction<'a> = RoTransaction<'a>;
183

×
184
    fn begin_txn(&self) -> Result<Self::Transaction<'_>, StorageError> {
7,512,134✔
185
        self.env.begin_txn()
7,512,134✔
186
    }
7,512,134✔
187
}
188

189
impl MainEnvironment for RoMainEnvironment {
×
190
    fn common(&self) -> &MainEnvironmentCommon {
10,599,931✔
191
        &self.common
10,599,931✔
192
    }
10,599,931✔
193

×
194
    fn schema(&self) -> &SchemaWithIndex {
9,378✔
195
        &self.schema
9,378✔
196
    }
9,378✔
197
}
198

199
impl RoMainEnvironment {
×
200
    pub fn new(options: &CacheOptions) -> Result<Self, CacheError> {
293✔
201
        let (env, common, _, schema, _) = open_env(options, false)?;
293✔
202
        let schema = schema.ok_or(CacheError::SchemaNotFound)?;
299✔
203
        Ok(Self {
299✔
204
            env,
299✔
205
            common,
299✔
206
            schema,
299✔
207
        })
299✔
208
    }
299✔
209
}
×
210

×
211
#[allow(clippy::type_complexity)]
×
212
fn open_env(
449✔
213
    options: &CacheOptions,
449✔
214
    create_if_not_exist: bool,
449✔
215
) -> Result<
449✔
216
    (
449✔
217
        LmdbEnvironmentManager,
449✔
218
        MainEnvironmentCommon,
449✔
219
        LmdbOption<SchemaWithIndex>,
449✔
220
        Option<SchemaWithIndex>,
449✔
221
        Option<TempDir>,
449✔
222
    ),
449✔
223
    CacheError,
449✔
224
> {
449✔
225
    let (mut env, (base_path, name), temp_dir) = init_env(options, create_if_not_exist)?;
449✔
226

227
    let operation_log = OperationLog::new(&mut env, create_if_not_exist)?;
449✔
228
    let schema_option = LmdbOption::new(&mut env, Some("schema"), create_if_not_exist)?;
449✔
229

×
230
    let schema = schema_option
454✔
231
        .load(&env.begin_txn()?)?
449✔
232
        .map(IntoOwned::into_owned);
454✔
233

454✔
234
    Ok((
454✔
235
        env,
454✔
236
        MainEnvironmentCommon {
454✔
237
            base_path,
454✔
238
            name,
454✔
239
            operation_log,
454✔
240
            intersection_chunk_size: options.intersection_chunk_size,
454✔
241
        },
454✔
242
        schema_option,
454✔
243
        schema,
454✔
244
        temp_dir,
454✔
245
    ))
454✔
246
}
454✔
247

×
248
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
12,002✔
249
    debug_assert_eq!(schema.identifier, record.schema_id);
12,002✔
250
    debug_assert_eq!(schema.fields.len(), record.values.len());
12,002✔
251
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
107,847✔
252
        if field.nullable && value == &Field::Null {
107,847✔
253
            continue;
17,865✔
254
        }
89,982✔
255
        match field.typ {
89,982✔
256
            FieldType::UInt => {
×
257
                debug_assert!(value.as_uint().is_some())
41,856✔
258
            }
259
            FieldType::Int => {
×
260
                debug_assert!(value.as_int().is_some())
78✔
261
            }
×
262
            FieldType::Float => {
×
263
                debug_assert!(value.as_float().is_some())
12,024✔
264
            }
×
265
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
266
            FieldType::String => debug_assert!(value.as_string().is_some()),
30,022✔
267
            FieldType::Text => debug_assert!(value.as_text().is_some()),
2✔
268
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
269
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
270
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
6,000✔
271
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
272
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
273
            FieldType::Point => debug_assert!(value.as_point().is_some()),
×
274
        }
×
275
    }
276
}
12,002✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc