• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.5
/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
1
use dozer_storage::{
2
    errors::StorageError,
3
    lmdb::RoTransaction,
4
    lmdb_storage::{LmdbEnvironmentManager, SharedTransaction},
5
    BeginTransaction, LmdbOption, ReadTransaction,
6
};
7
use dozer_types::{
8
    borrow::IntoOwned,
9
    types::{Field, FieldType, Record, Schema, SchemaWithIndex},
10
};
11

12
use crate::{
13
    cache::{index, lmdb::utils::init_env, RecordWithId},
14
    errors::CacheError,
15
};
16

17
use super::{CacheCommonOptions, CacheWriteOptions};
18

19
mod operation_log;
20

21
pub use operation_log::{Operation, OperationLog};
22

23
pub trait MainEnvironment: BeginTransaction {
24
    fn common(&self) -> &MainEnvironmentCommon;
25

26
    fn schema(&self) -> &SchemaWithIndex;
27

28
    fn name(&self) -> &str {
166✔
29
        &self.common().name
166✔
30
    }
166✔
31

32
    fn operation_log(&self) -> OperationLog {
3,403,641✔
33
        self.common().operation_log
3,403,641✔
34
    }
3,403,641✔
35

36
    fn intersection_chunk_size(&self) -> usize {
2✔
37
        self.common().intersection_chunk_size
2✔
38
    }
2✔
39

40
    fn count(&self) -> Result<usize, CacheError> {
136✔
41
        let txn = self.begin_txn()?;
136✔
42
        self.operation_log()
136✔
43
            .count_present_records(&txn, self.schema().0.is_append_only())
136✔
44
            .map_err(Into::into)
136✔
45
    }
136✔
46

47
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
30✔
48
        let txn = self.begin_txn()?;
30✔
49
        self.operation_log()
30✔
50
            .get_record(&txn, key)?
30✔
51
            .ok_or(CacheError::PrimaryKeyNotFound)
30✔
52
    }
30✔
53
}
54

55
#[derive(Debug)]
×
56
pub struct MainEnvironmentCommon {
57
    /// The environment name.
58
    name: String,
59
    /// The operation log.
60
    operation_log: OperationLog,
61
    intersection_chunk_size: usize,
62
}
63

64
#[derive(Debug)]
×
65
pub struct RwMainEnvironment {
66
    txn: SharedTransaction,
67
    common: MainEnvironmentCommon,
68
    schema: SchemaWithIndex,
69
}
70

71
impl BeginTransaction for RwMainEnvironment {
72
    type Transaction<'a> = ReadTransaction<'a>;
73

74
    fn begin_txn(&self) -> Result<Self::Transaction<'_>, StorageError> {
11,416✔
75
        self.txn.begin_txn()
11,416✔
76
    }
11,416✔
77
}
78

79
impl MainEnvironment for RwMainEnvironment {
80
    fn common(&self) -> &MainEnvironmentCommon {
80,777✔
81
        &self.common
80,777✔
82
    }
80,777✔
83

84
    fn schema(&self) -> &SchemaWithIndex {
350✔
85
        &self.schema
350✔
86
    }
350✔
87
}
88

89
impl RwMainEnvironment {
90
    pub fn open(
4✔
91
        common_options: &CacheCommonOptions,
4✔
92
        write_options: CacheWriteOptions,
4✔
93
    ) -> Result<Self, CacheError> {
4✔
94
        let (env, common, schema) = open_env_with_schema(common_options, Some(write_options))?;
4✔
95

96
        Ok(Self {
97
            txn: env.create_txn()?,
4✔
98
            common,
4✔
99
            schema,
4✔
100
        })
101
    }
4✔
102

103
    pub fn create(
152✔
104
        schema: &SchemaWithIndex,
152✔
105
        common_options: &CacheCommonOptions,
152✔
106
        write_options: CacheWriteOptions,
152✔
107
    ) -> Result<Self, CacheError> {
152✔
108
        let (env, common, schema_option, old_schema) =
152✔
109
            open_env(common_options, Some(write_options))?;
152✔
110
        let txn = env.create_txn()?;
152✔
111

112
        let schema = if let Some(old_schema) = old_schema {
152✔
113
            if &old_schema != schema {
×
114
                return Err(CacheError::SchemaMismatch {
×
115
                    name: common.name,
×
116
                    given: Box::new(schema.clone()),
×
117
                    stored: Box::new(old_schema),
×
118
                });
×
119
            }
×
120
            old_schema
×
121
        } else {
122
            let mut txn = txn.write();
152✔
123
            schema_option.store(txn.txn_mut(), schema)?;
152✔
124
            txn.commit_and_renew()?;
152✔
125
            schema.clone()
152✔
126
        };
127

128
        Ok(Self {
152✔
129
            txn,
152✔
130
            common,
152✔
131
            schema,
152✔
132
        })
152✔
133
    }
152✔
134

135
    /// Inserts the record into the cache and sets the record version. Returns the record id.
136
    ///
137
    /// Every time a record with the same primary key is inserted, its version number gets increased by 1.
138
    pub fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
11,330✔
139
        debug_check_schema_record_consistency(&self.schema.0, record);
11,330✔
140

141
        let primary_key = if self.schema.0.is_append_only() {
11,330✔
142
            None
31✔
143
        } else {
144
            Some(index::get_primary_key(
11,299✔
145
                &self.schema.0.primary_index,
11,299✔
146
                &record.values,
11,299✔
147
            ))
11,299✔
148
        };
149

150
        let mut txn = self.txn.write();
11,330✔
151
        let txn = txn.txn_mut();
11,330✔
152
        self.common
11,330✔
153
            .operation_log
11,330✔
154
            .insert(txn, record, primary_key.as_deref())?
11,330✔
155
            .ok_or(CacheError::PrimaryKeyExists)
11,330✔
156
    }
11,330✔
157

158
    /// Deletes the record and returns the record version.
159
    pub fn delete(&self, primary_key: &[u8]) -> Result<u32, CacheError> {
13✔
160
        let mut txn = self.txn.write();
13✔
161
        let txn = txn.txn_mut();
13✔
162
        self.common
13✔
163
            .operation_log
13✔
164
            .delete(txn, primary_key)?
13✔
165
            .ok_or(CacheError::PrimaryKeyNotFound)
13✔
166
    }
13✔
167

168
    pub fn commit(&self) -> Result<(), CacheError> {
139✔
169
        self.txn.write().commit_and_renew().map_err(Into::into)
139✔
170
    }
139✔
171
}
172

173
#[derive(Debug)]
×
174
pub struct RoMainEnvironment {
175
    env: LmdbEnvironmentManager,
176
    common: MainEnvironmentCommon,
177
    schema: SchemaWithIndex,
178
}
179

180
impl BeginTransaction for RoMainEnvironment {
181
    type Transaction<'a> = RoTransaction<'a>;
182

183
    fn begin_txn(&self) -> Result<Self::Transaction<'_>, StorageError> {
4,427✔
184
        self.env.begin_txn()
4,427✔
185
    }
4,427✔
186
}
187

188
impl MainEnvironment for RoMainEnvironment {
189
    fn common(&self) -> &MainEnvironmentCommon {
3,322,966✔
190
        &self.common
3,322,966✔
191
    }
3,322,966✔
192

193
    fn schema(&self) -> &SchemaWithIndex {
9,223✔
194
        &self.schema
9,223✔
195
    }
9,223✔
196
}
197

198
impl RoMainEnvironment {
199
    pub fn new(common_options: &CacheCommonOptions) -> Result<Self, CacheError> {
143✔
200
        let (env, common, schema) = open_env_with_schema(common_options, None)?;
143✔
201
        Ok(Self {
143✔
202
            env,
143✔
203
            common,
143✔
204
            schema,
143✔
205
        })
143✔
206
    }
143✔
207
}
208

209
fn open_env(
299✔
210
    common_options: &CacheCommonOptions,
299✔
211
    write_options: Option<CacheWriteOptions>,
299✔
212
) -> Result<
299✔
213
    (
299✔
214
        LmdbEnvironmentManager,
299✔
215
        MainEnvironmentCommon,
299✔
216
        LmdbOption<SchemaWithIndex>,
299✔
217
        Option<SchemaWithIndex>,
299✔
218
    ),
299✔
219
    CacheError,
299✔
220
> {
299✔
221
    let (mut env, name) = init_env(common_options, write_options)?;
299✔
222

223
    let create_if_not_exist = write_options.is_some();
299✔
224
    let operation_log = OperationLog::new(&mut env, create_if_not_exist)?;
299✔
225
    let schema_option = LmdbOption::new(&mut env, Some("schema"), create_if_not_exist)?;
299✔
226

227
    let schema = schema_option
299✔
228
        .load(&env.begin_txn()?)?
299✔
229
        .map(IntoOwned::into_owned);
299✔
230

299✔
231
    Ok((
299✔
232
        env,
299✔
233
        MainEnvironmentCommon {
299✔
234
            name,
299✔
235
            operation_log,
299✔
236
            intersection_chunk_size: common_options.intersection_chunk_size,
299✔
237
        },
299✔
238
        schema_option,
299✔
239
        schema,
299✔
240
    ))
299✔
241
}
299✔
242

243
fn open_env_with_schema(
147✔
244
    common_options: &CacheCommonOptions,
147✔
245
    write_options: Option<CacheWriteOptions>,
147✔
246
) -> Result<
147✔
247
    (
147✔
248
        LmdbEnvironmentManager,
147✔
249
        MainEnvironmentCommon,
147✔
250
        SchemaWithIndex,
147✔
251
    ),
147✔
252
    CacheError,
147✔
253
> {
147✔
254
    let (env, common, _, schema) = open_env(common_options, write_options)?;
147✔
255
    let schema = schema.ok_or(CacheError::SchemaNotFound)?;
147✔
256
    Ok((env, common, schema))
147✔
257
}
147✔
258

259
fn debug_check_schema_record_consistency(schema: &Schema, record: &Record) {
11,324✔
260
    debug_assert_eq!(schema.identifier, record.schema_id);
11,324✔
261
    debug_assert_eq!(schema.fields.len(), record.values.len());
11,384✔
262
    for (field, value) in schema.fields.iter().zip(record.values.iter()) {
106,581✔
263
        if field.nullable && value == &Field::Null {
106,581✔
264
            continue;
17,751✔
265
        }
89,256✔
266
        match field.typ {
89,256✔
267
            FieldType::UInt => {
268
                debug_assert!(value.as_uint().is_some())
41,322✔
269
            }
270
            FieldType::Int => {
271
                debug_assert!(value.as_int().is_some())
78✔
272
            }
273
            FieldType::Float => {
274
                debug_assert!(value.as_float().is_some())
12,024✔
275
            }
276
            FieldType::Boolean => debug_assert!(value.as_boolean().is_some()),
×
277
            FieldType::String => debug_assert!(value.as_string().is_some()),
29,428✔
278
            FieldType::Text => debug_assert!(value.as_text().is_some()),
2✔
279
            FieldType::Binary => debug_assert!(value.as_binary().is_some()),
×
280
            FieldType::Decimal => debug_assert!(value.as_decimal().is_some()),
×
281
            FieldType::Timestamp => debug_assert!(value.as_timestamp().is_some()),
6,000✔
282
            FieldType::Date => debug_assert!(value.as_date().is_some()),
×
283
            FieldType::Bson => debug_assert!(value.as_bson().is_some()),
×
284
            FieldType::Point => debug_assert!(value.as_point().is_some()),
×
285
        }
286
    }
287
}
11,954✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc