• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.29
/dozer-cache/src/cache/lmdb/cache/main_environment/operation_log/mod.rs
1
use dozer_storage::{
2
    errors::StorageError,
3
    lmdb::{RoCursor, RwTransaction, Transaction},
4
    lmdb_storage::CreateDatabase,
5
    KeyIterator, LmdbCounter, LmdbMap, LmdbSet,
6
};
7
use dozer_types::{
8
    borrow::{Borrow, Cow, IntoOwned},
9
    serde::{Deserialize, Serialize},
10
    types::Record,
11
};
12

13
use crate::cache::RecordWithId;
14

15
#[derive(Debug, Clone, PartialEq, Deserialize)]
3,622,251✔
16
#[serde(crate = "dozer_types::serde")]
17
pub enum Operation {
18
    Delete {
19
        /// The operation id of an `Insert` operation, which must exist.
20
        operation_id: u64,
21
    },
22
    Insert {
23
        record_id: u64,
24
        record: Record,
25
    },
26
}
27

28
#[derive(Debug, Clone, Copy, Serialize)]
22,812✔
29
#[serde(crate = "dozer_types::serde")]
30
pub enum OperationBorrow<'a> {
31
    Delete {
32
        /// The operation id of an `Insert` operation, which must exist.
33
        operation_id: u64,
34
    },
35
    Insert {
36
        record_id: u64,
37
        record: &'a Record,
38
    },
39
}
40

41
#[derive(Debug, Clone, Copy)]
×
42
pub struct OperationLog {
43
    /// Record primary key -> RecordMetadata, empty if schema has no primary key.
44
    /// Length always increases.
45
    primary_key_to_metadata: LmdbMap<Vec<u8>, RecordMetadata>,
46
    /// Operation ids of latest `Insert`s. Used to filter out deleted records in query. Empty if schema has no primary key.
47
    present_operation_ids: LmdbSet<u64>,
48
    /// The next operation id. Monotonically increasing.
49
    next_operation_id: LmdbCounter,
50
    /// Operation_id -> operation.
51
    operation_id_to_operation: LmdbMap<u64, Operation>,
52
}
53

54
impl OperationLog {
55
    pub fn new<C: CreateDatabase>(
301✔
56
        c: &mut C,
301✔
57
        create_if_not_exist: bool,
301✔
58
    ) -> Result<Self, StorageError> {
301✔
59
        let primary_key_to_metadata =
301✔
60
            LmdbMap::new(c, Some("primary_key_to_metadata"), create_if_not_exist)?;
301✔
61
        let present_operation_ids =
301✔
62
            LmdbSet::new(c, Some("present_operation_ids"), create_if_not_exist)?;
301✔
63
        let next_operation_id =
301✔
64
            LmdbCounter::new(c, Some("next_operation_id"), create_if_not_exist)?;
301✔
65
        let operation_id_to_operation =
301✔
66
            LmdbMap::new(c, Some("operation_id_to_operation"), create_if_not_exist)?;
301✔
67
        Ok(Self {
301✔
68
            primary_key_to_metadata,
301✔
69
            present_operation_ids,
301✔
70
            next_operation_id,
301✔
71
            operation_id_to_operation,
301✔
72
        })
301✔
73
    }
301✔
74

75
    pub fn count_present_records<T: Transaction>(
149✔
76
        &self,
149✔
77
        txn: &T,
149✔
78
        schema_is_append_only: bool,
149✔
79
    ) -> Result<usize, StorageError> {
149✔
80
        if schema_is_append_only {
149✔
81
            self.operation_id_to_operation.count(txn)
22✔
82
        } else {
83
            self.present_operation_ids.count(txn)
127✔
84
        }
85
        .map_err(Into::into)
149✔
86
    }
149✔
87

88
    pub fn get_record<T: Transaction>(
33✔
89
        &self,
33✔
90
        txn: &T,
33✔
91
        key: &[u8],
33✔
92
    ) -> Result<Option<RecordWithId>, StorageError> {
33✔
93
        let Some(metadata) = self.primary_key_to_metadata.get(txn, key)? else {
33✔
94
            return Ok(None);
×
95
        };
96
        let Some(insert_operation_id) = metadata.borrow().insert_operation_id else {
33✔
97
            return Ok(None);
8✔
98
        };
99
        self.get_record_by_operation_id_unchecked(txn, insert_operation_id)
25✔
100
            .map(Some)
25✔
101
    }
33✔
102

103
    pub fn next_operation_id<T: Transaction>(&self, txn: &T) -> Result<u64, StorageError> {
80,304✔
104
        self.next_operation_id.load(txn).map_err(Into::into)
80,304✔
105
    }
80,304✔
106

107
    pub fn present_operation_ids<'txn, T: Transaction>(
173✔
108
        &self,
173✔
109
        txn: &'txn T,
173✔
110
        schema_is_append_only: bool,
173✔
111
    ) -> Result<KeyIterator<'txn, RoCursor<'txn>, u64>, StorageError> {
173✔
112
        // If schema is append only, then all operation ids are latest `Insert`s.
173✔
113
        if schema_is_append_only {
173✔
114
            self.operation_id_to_operation.keys(txn)
23✔
115
        } else {
116
            self.present_operation_ids.iter(txn)
150✔
117
        }
118
    }
173✔
119

120
    pub fn contains_operation_id<T: Transaction>(
2,196,248✔
121
        &self,
2,196,248✔
122
        txn: &T,
2,196,248✔
123
        schema_is_append_only: bool,
2,196,248✔
124
        operation_id: u64,
2,196,248✔
125
    ) -> Result<bool, StorageError> {
2,196,248✔
126
        // If schema is append only, then all operation ids are latest `Insert`s.
2,196,248✔
127
        if schema_is_append_only {
2,196,248✔
128
            Ok(true)
42✔
129
        } else {
130
            self.present_operation_ids.contains(txn, &operation_id)
2,196,206✔
131
        }
132
        .map_err(Into::into)
2,196,248✔
133
    }
2,196,248✔
134

135
    pub fn get_record_by_operation_id_unchecked<T: Transaction>(
1,126,658✔
136
        &self,
1,126,658✔
137
        txn: &T,
1,126,658✔
138
        operation_id: u64,
1,126,658✔
139
    ) -> Result<RecordWithId, StorageError> {
1,126,658✔
140
        let Some(Cow::Owned(Operation::Insert {
141
            record_id,
1,126,658✔
142
            record,
1,126,658✔
143
        })) = self.operation_id_to_operation.get(txn, &operation_id)? else {
1,126,658✔
144
            panic!(
×
145
                "Inconsistent state: primary_key_to_metadata or present_operation_ids contains an insert operation id that is not an Insert operation"
×
146
            );
×
147
        };
148
        Ok(RecordWithId::new(record_id, record))
1,126,658✔
149
    }
1,126,658✔
150

151
    pub fn get_operation<T: Transaction>(
80,677✔
152
        &self,
80,677✔
153
        txn: &T,
80,677✔
154
        operation_id: u64,
80,677✔
155
    ) -> Result<Operation, StorageError> {
80,677✔
156
        Ok(self
80,677✔
157
            .operation_id_to_operation
80,677✔
158
            .get(txn, &operation_id)?
80,677✔
159
            .unwrap_or_else(|| panic!("Operation id {} out of range", operation_id))
80,677✔
160
            .into_owned())
80,677✔
161
    }
80,677✔
162

163
    /// Inserts the record and sets the record version. Returns the record id.
164
    ///
165
    /// If the record's primary key collides with an existing record, returns `None`.
166
    ///
167
    /// Every time a record with the same primary key is inserted, its version number gets increased by 1.
168
    pub fn insert(
11,337✔
169
        &self,
11,337✔
170
        txn: &mut RwTransaction,
11,337✔
171
        record: &mut Record,
11,337✔
172
        primary_key: Option<&[u8]>,
11,337✔
173
    ) -> Result<Option<u64>, StorageError> {
11,337✔
174
        // Calculate operation id and record id.
175
        let (operation_id, record_id) = if let Some(primary_key) = primary_key {
11,607✔
176
            // Get or generate record id from `primary_key_to_metadata`.
177
            let (record_id, record_version) =
11,331✔
178
                match self.primary_key_to_metadata.get(txn, primary_key)? {
11,296✔
179
                    // Primary key is never inserted before. Generate new id from `primary_key_to_metadata`.
180
                    None => (
181
                        self.primary_key_to_metadata.count(txn)? as u64,
11,329✔
182
                        INITIAL_RECORD_VERSION,
183
                    ),
184
                    Some(metadata) => {
3✔
185
                        let metadata = metadata.borrow();
3✔
186
                        if metadata.insert_operation_id.is_some() {
3✔
187
                            // Primary key collision.
188
                            return Ok(None);
1✔
189
                        } else {
190
                            // This primary key was deleted. Use the record id from its first insertion.
191
                            (metadata.id, metadata.version + 1)
2✔
192
                        }
193
                    }
194
                };
195
            // Generation operation id.
196
            let operation_id = self.next_operation_id.fetch_add(txn, 1)?;
11,331✔
197
            // Update `primary_key_to_metadata` and `present_operation_ids`.
198
            self.primary_key_to_metadata.insert_overwrite(
11,331✔
199
                txn,
11,331✔
200
                primary_key,
11,331✔
201
                &RecordMetadata {
11,331✔
202
                    id: record_id,
11,331✔
203
                    version: record_version,
11,331✔
204
                    insert_operation_id: Some(operation_id),
11,331✔
205
                },
11,331✔
206
            )?;
11,331✔
207
            if !self.present_operation_ids.insert(txn, &operation_id)? {
11,331✔
208
                panic!("Inconsistent state: operation id already exists");
×
209
            }
11,565✔
210
            // Update record version.
11,565✔
211
            record.version = Some(record_version);
11,565✔
212
            (operation_id, record_id)
11,565✔
213
        } else {
214
            record.version = Some(INITIAL_RECORD_VERSION);
41✔
215
            // Generation operation id.
216
            let operation_id = self.next_operation_id.fetch_add(txn, 1)?;
41✔
217
            // If the record has no primary key, record id is operation id.
218
            (operation_id, operation_id)
41✔
219
        };
220
        // Record operation. The operation id must not exist.
221
        if !self.operation_id_to_operation.insert(
11,606✔
222
            txn,
11,606✔
223
            &operation_id,
11,606✔
224
            OperationBorrow::Insert { record_id, record },
11,606✔
225
        )? {
11,606✔
226
            panic!("Inconsistent state: operation id already exists");
×
227
        }
11,372✔
228
        Ok(Some(record_id))
11,372✔
229
    }
11,373✔
230

231
    /// Deletes the record and returns the record version. Returns `None` if the record does not exist.
232
    pub fn delete(
16✔
233
        &self,
16✔
234
        txn: &mut RwTransaction,
16✔
235
        primary_key: &[u8],
16✔
236
    ) -> Result<Option<u32>, StorageError> {
16✔
237
        // Find operation id by primary key.
238
        let Some(metadata) = self.primary_key_to_metadata.get(txn, primary_key)? else {
16✔
239
            return Ok(None);
1✔
240
        };
241
        let metadata = metadata.into_owned();
15✔
242
        let Some(insert_operation_id) = metadata.insert_operation_id else {
15✔
243
            return Ok(None);
1✔
244
        };
245
        // Remove deleted operation id.
246
        self.primary_key_to_metadata.insert_overwrite(
14✔
247
            txn,
14✔
248
            primary_key,
14✔
249
            &RecordMetadata {
14✔
250
                id: metadata.id,
14✔
251
                version: metadata.version,
14✔
252
                insert_operation_id: None,
14✔
253
            },
14✔
254
        )?;
14✔
255
        // The operation id must be present.
256
        if !self
14✔
257
            .present_operation_ids
14✔
258
            .remove(txn, &insert_operation_id)?
14✔
259
        {
260
            panic!("Inconsistent state: insert operation id not found")
×
261
        }
14✔
262
        // Generate new operation id.
263
        let operation_id = self.next_operation_id.fetch_add(txn, 1)?;
14✔
264
        // Record delete operation. The operation id must not exist.
265
        if !self.operation_id_to_operation.insert(
14✔
266
            txn,
14✔
267
            &operation_id,
14✔
268
            OperationBorrow::Delete {
14✔
269
                operation_id: insert_operation_id,
14✔
270
            },
14✔
271
        )? {
14✔
272
            panic!("Inconsistent state: operation id already exists");
×
273
        }
14✔
274
        Ok(Some(metadata.version))
14✔
275
    }
16✔
276
}
277

278
const INITIAL_RECORD_VERSION: u32 = 1_u32;
279

280
#[derive(Debug, Clone, Copy, PartialEq)]
2✔
281
struct RecordMetadata {
282
    /// The record id. Consistent across `insert`s and `delete`s.
283
    id: u64,
284
    /// The latest record version, even if the record is deleted.
285
    version: u32,
286
    /// The operation id of the latest `Insert` operation. `None` if the record is deleted.
287
    insert_operation_id: Option<u64>,
288
}
289

290
mod lmdb_val_impl;
291

292
#[cfg(test)]
293
mod tests {
294
    use crate::cache::lmdb::utils::init_env;
295

296
    use super::*;
297

298
    #[test]
1✔
299
    fn test_operation_log_append_only() {
1✔
300
        let mut env = init_env(&Default::default(), Some(Default::default()))
1✔
301
            .unwrap()
1✔
302
            .0;
1✔
303
        let log = OperationLog::new(&mut env, true).unwrap();
1✔
304
        let mut txn = env.begin_rw_txn().unwrap();
1✔
305
        let append_only = true;
1✔
306

1✔
307
        let mut records = vec![Record::new(None, vec![], None); 10];
1✔
308
        for (index, record) in records.iter_mut().enumerate() {
10✔
309
            let record_id = log.insert(&mut txn, record, None).unwrap().unwrap();
10✔
310
            assert_eq!(record_id, index as u64);
10✔
311
            assert_eq!(record.version, Some(INITIAL_RECORD_VERSION));
10✔
312
            assert_eq!(
10✔
313
                log.count_present_records(&txn, append_only).unwrap(),
10✔
314
                index + 1
10✔
315
            );
10✔
316
            assert_eq!(log.next_operation_id(&txn).unwrap(), index as u64 + 1);
10✔
317
            assert_eq!(
10✔
318
                log.present_operation_ids(&txn, append_only)
10✔
319
                    .unwrap()
10✔
320
                    .map(|result| result.map(IntoOwned::into_owned))
55✔
321
                    .collect::<Result<Vec<_>, _>>()
10✔
322
                    .unwrap(),
10✔
323
                (0..=index as u64).collect::<Vec<_>>()
10✔
324
            );
10✔
325
            assert_eq!(
10✔
326
                log.contains_operation_id(&txn, append_only, index as _)
10✔
327
                    .unwrap(),
10✔
328
                true
10✔
329
            );
10✔
330
            assert_eq!(
10✔
331
                log.get_record_by_operation_id_unchecked(&txn, index as _)
10✔
332
                    .unwrap(),
10✔
333
                RecordWithId::new(record_id, record.clone())
10✔
334
            );
10✔
335
            assert_eq!(
10✔
336
                log.get_operation(&txn, index as _).unwrap(),
10✔
337
                Operation::Insert {
10✔
338
                    record_id,
10✔
339
                    record: record.clone(),
10✔
340
                }
10✔
341
            );
10✔
342
        }
343
    }
1✔
344

345
    #[test]
1✔
346
    fn test_operation_log_with_primary_key() {
1✔
347
        let mut env = init_env(&Default::default(), Some(Default::default()))
1✔
348
            .unwrap()
1✔
349
            .0;
1✔
350
        let log = OperationLog::new(&mut env, true).unwrap();
1✔
351
        let mut txn = env.begin_rw_txn().unwrap();
1✔
352
        let append_only = false;
1✔
353

1✔
354
        // Insert a record.
1✔
355
        let mut record = Record::new(None, vec![], None);
1✔
356
        let primary_key = b"primary_key";
1✔
357
        let record_id = log
1✔
358
            .insert(&mut txn, &mut record, Some(primary_key))
1✔
359
            .unwrap()
1✔
360
            .unwrap();
1✔
361
        assert_eq!(record_id, 0);
1✔
362
        assert_eq!(record.version, Some(INITIAL_RECORD_VERSION));
1✔
363
        assert_eq!(log.count_present_records(&txn, append_only).unwrap(), 1);
1✔
364
        assert_eq!(
1✔
365
            log.get_record(&txn, primary_key).unwrap().unwrap(),
1✔
366
            RecordWithId::new(record_id, record.clone())
1✔
367
        );
1✔
368
        assert_eq!(log.next_operation_id(&txn).unwrap(), 1);
1✔
369
        assert_eq!(
1✔
370
            log.present_operation_ids(&txn, append_only)
1✔
371
                .unwrap()
1✔
372
                .map(|result| result.map(IntoOwned::into_owned))
1✔
373
                .collect::<Result<Vec<_>, _>>()
1✔
374
                .unwrap(),
1✔
375
            vec![0]
1✔
376
        );
1✔
377
        assert_eq!(
1✔
378
            log.contains_operation_id(&txn, append_only, 0).unwrap(),
1✔
379
            true
1✔
380
        );
1✔
381
        assert_eq!(
1✔
382
            log.get_record_by_operation_id_unchecked(&txn, 0).unwrap(),
1✔
383
            RecordWithId::new(record_id, record.clone())
1✔
384
        );
1✔
385
        assert_eq!(
1✔
386
            log.get_operation(&txn, 0).unwrap(),
1✔
387
            Operation::Insert {
1✔
388
                record_id,
1✔
389
                record: record.clone(),
1✔
390
            }
1✔
391
        );
1✔
392

393
        // Insert again with the same primary key should fail.
394
        assert_eq!(
1✔
395
            log.insert(&mut txn, &mut record, Some(primary_key))
1✔
396
                .unwrap(),
1✔
397
            None
1✔
398
        );
1✔
399

400
        // Delete the record.
401
        let version = log.delete(&mut txn, primary_key).unwrap().unwrap();
1✔
402
        assert_eq!(version, INITIAL_RECORD_VERSION);
1✔
403
        assert_eq!(log.count_present_records(&txn, append_only).unwrap(), 0);
1✔
404
        assert_eq!(log.get_record(&txn, primary_key).unwrap(), None);
1✔
405
        assert_eq!(log.next_operation_id(&txn).unwrap(), 2);
1✔
406
        assert_eq!(
1✔
407
            log.present_operation_ids(&txn, append_only)
1✔
408
                .unwrap()
1✔
409
                .map(|result| result.map(IntoOwned::into_owned))
1✔
410
                .collect::<Result<Vec<_>, _>>()
1✔
411
                .unwrap(),
1✔
412
            Vec::<u64>::new(),
1✔
413
        );
1✔
414
        assert_eq!(
1✔
415
            log.contains_operation_id(&txn, append_only, 0).unwrap(),
1✔
416
            false
1✔
417
        );
1✔
418
        assert_eq!(
1✔
419
            log.get_operation(&txn, 1).unwrap(),
1✔
420
            Operation::Delete { operation_id: 0 }
1✔
421
        );
1✔
422

423
        // Delete a non-existing record should fail.
424
        assert_eq!(
1✔
425
            log.delete(&mut txn, b"non_existing_primary_key").unwrap(),
1✔
426
            None
1✔
427
        );
1✔
428

429
        // Delete an deleted record should fail.
430
        assert_eq!(log.delete(&mut txn, primary_key).unwrap(), None);
1✔
431

432
        // Insert with that primary key again.
433
        let record_id = log
1✔
434
            .insert(&mut txn, &mut record, Some(primary_key))
1✔
435
            .unwrap()
1✔
436
            .unwrap();
1✔
437
        assert_eq!(record_id, 0);
1✔
438
        assert_eq!(record.version, Some(INITIAL_RECORD_VERSION + 1));
1✔
439
        assert_eq!(log.count_present_records(&txn, append_only).unwrap(), 1);
1✔
440
        assert_eq!(
1✔
441
            log.get_record(&txn, primary_key).unwrap().unwrap(),
1✔
442
            RecordWithId::new(record_id, record.clone())
1✔
443
        );
1✔
444
        assert_eq!(log.next_operation_id(&txn).unwrap(), 3);
1✔
445
        assert_eq!(
1✔
446
            log.present_operation_ids(&txn, append_only)
1✔
447
                .unwrap()
1✔
448
                .map(|result| result.map(IntoOwned::into_owned))
1✔
449
                .collect::<Result<Vec<_>, _>>()
1✔
450
                .unwrap(),
1✔
451
            vec![2]
1✔
452
        );
1✔
453
        assert_eq!(
1✔
454
            log.contains_operation_id(&txn, append_only, 2).unwrap(),
1✔
455
            true
1✔
456
        );
1✔
457
        assert_eq!(
1✔
458
            log.get_record_by_operation_id_unchecked(&txn, 2).unwrap(),
1✔
459
            RecordWithId::new(record_id, record.clone())
1✔
460
        );
1✔
461
        assert_eq!(
1✔
462
            log.get_operation(&txn, 2).unwrap(),
1✔
463
            Operation::Insert {
1✔
464
                record_id,
1✔
465
                record: record.clone(),
1✔
466
            }
1✔
467
        );
1✔
468
    }
1✔
469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc