• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.16
/dozer-storage/src/lmdb_database/raw_iterator.rs
1
use std::ops::Bound;
2

3
use lmdb::Cursor;
4
use lmdb_sys::{
5
    MDB_FIRST, MDB_LAST, MDB_NEXT, MDB_NEXT_NODUP, MDB_PREV, MDB_PREV_NODUP, MDB_SET_RANGE,
6
};
7

8
use crate::errors::StorageError;
9

10
type KeyValuePair<'txn> = (&'txn [u8], &'txn [u8]);
11

12
enum IteratorState<'txn> {
13
    First {
14
        item: Option<KeyValuePair<'txn>>,
15
        ascending: bool,
16
    },
17
    NotFirst {
18
        ascending: bool,
19
    },
20
}
21

22
pub struct RawIterator<'txn, C: Cursor<'txn>> {
23
    cursor: C,
24
    state: IteratorState<'txn>,
25
}
26

27
impl<'txn, C: Cursor<'txn>> Iterator for RawIterator<'txn, C> {
28
    type Item = Result<(&'txn [u8], &'txn [u8]), lmdb::Error>;
29

30
    fn next(&mut self) -> Option<Self::Item> {
4,759,171✔
31
        match &self.state {
4,759,171✔
32
            IteratorState::First { item, ascending } => {
8,642✔
33
                let item = *item;
8,642✔
34
                self.state = IteratorState::NotFirst {
8,642✔
35
                    ascending: *ascending,
8,642✔
36
                };
8,642✔
37
                item.map(Ok)
8,642✔
38
            }
39
            IteratorState::NotFirst { ascending } => if *ascending {
4,750,529✔
40
                cursor_get(&self.cursor, MDB_NEXT)
2,956,431✔
41
            } else {
42
                cursor_get(&self.cursor, MDB_PREV)
1,794,098✔
43
            }
44
            .transpose(),
4,750,529✔
45
        }
46
    }
4,759,171✔
47
}
48

49
impl<'txn, C: Cursor<'txn>> RawIterator<'txn, C> {
50
    pub fn new(
8,642✔
51
        cursor: C,
8,642✔
52
        starting_key: Bound<&[u8]>,
8,642✔
53
        ascending: bool,
8,642✔
54
    ) -> Result<Self, StorageError> {
8,642✔
55
        let item = match (starting_key, ascending) {
8,642✔
56
            (Bound::Included(starting_key), true) => {
2,522✔
57
                cursor_get_greater_than_or_equal_to(&cursor, starting_key)
2,522✔
58
            }
59
            (Bound::Excluded(starting_key), true) => cursor_get_greater_than(&cursor, starting_key),
1,499✔
60
            (Bound::Unbounded, true) => cursor_get(&cursor, MDB_FIRST),
1,253✔
61
            (Bound::Included(starting_key), false) => {
769✔
62
                cursor_get_less_than_or_equal_to(&cursor, starting_key)
769✔
63
            }
64
            (Bound::Excluded(starting_key), false) => cursor_get_less_than(&cursor, starting_key),
2,307✔
65
            (Bound::Unbounded, false) => cursor_get(&cursor, MDB_LAST),
292✔
66
        }?;
×
67
        Ok(RawIterator {
8,642✔
68
            cursor,
8,642✔
69
            state: IteratorState::First { item, ascending },
8,642✔
70
        })
8,642✔
71
    }
8,642✔
72
}
73

74
fn cursor_get<'txn, C: Cursor<'txn>>(
75
    cursor: &C,
76
    op: u32,
77
) -> Result<Option<KeyValuePair<'txn>>, lmdb::Error> {
78
    match cursor.get(None, None, op) {
4,755,166✔
79
        Ok((key, value)) => Ok(Some((
4,751,031✔
80
            key.expect("Key should always be `Some` unless `op` is `MDB_SET`"),
4,751,031✔
81
            value,
4,751,031✔
82
        ))),
4,751,031✔
83
        Err(lmdb::Error::NotFound) => Ok(None),
4,135✔
84
        Err(e) => Err(e),
×
85
    }
86
}
4,755,166✔
87

88
fn cursor_get_greater_than_or_equal_to<'txn, 'key, C: Cursor<'txn>>(
89
    cursor: &C,
90
    key: &'key [u8],
91
) -> Result<Option<KeyValuePair<'txn>>, lmdb::Error> {
92
    match cursor.get(Some(key), None, MDB_SET_RANGE) {
7,117✔
93
        Ok((key, value)) => Ok(Some((
5,187✔
94
            key.expect("Key should always be `Some` unless `op` is `MDB_SET`"),
5,187✔
95
            value,
5,187✔
96
        ))),
5,187✔
97
        Err(lmdb::Error::NotFound) => Ok(None),
1,930✔
98
        Err(e) => Err(e),
×
99
    }
100
}
7,117✔
101

102
fn cursor_get_greater_than<'txn, 'key, C: Cursor<'txn>>(
103
    cursor: &C,
104
    key: &'key [u8],
105
) -> Result<Option<KeyValuePair<'txn>>, lmdb::Error> {
106
    match cursor_get_greater_than_or_equal_to(cursor, key)? {
1,504✔
107
        Some((hit_key, value)) => {
1,502✔
108
            if hit_key == key {
1,502✔
109
                // Hit equal key, get next.
110
                cursor_get(cursor, MDB_NEXT_NODUP)
8✔
111
            } else {
112
                // Hit greater key, return it.
113
                Ok(Some((hit_key, value)))
1,494✔
114
            }
115
        }
116
        None => Ok(None),
2✔
117
    }
118
}
1,504✔
119

120
fn cursor_get_less_than_or_equal_to<'txn, 'key, C: Cursor<'txn>>(
121
    cursor: &C,
122
    key: &'key [u8],
123
) -> Result<Option<KeyValuePair<'txn>>, lmdb::Error> {
124
    match cursor_get_greater_than_or_equal_to(cursor, key)? {
774✔
125
        Some((hit_key, value)) => {
4✔
126
            if hit_key == key {
4✔
127
                // Hit equal key, return it.
128
                Ok(Some((hit_key, value)))
2✔
129
            } else {
130
                // Hit greater key, get previous.
131
                cursor_get(cursor, MDB_PREV_NODUP)
2✔
132
            }
133
        }
134
        // All key less than given key, get last.
135
        None => cursor_get(cursor, MDB_LAST),
770✔
136
    }
137
}
774✔
138

139
fn cursor_get_less_than<'txn, 'key, C: Cursor<'txn>>(
140
    cursor: &C,
141
    key: &'key [u8],
142
) -> Result<Option<KeyValuePair<'txn>>, lmdb::Error> {
143
    match cursor_get_greater_than_or_equal_to(cursor, key)? {
2,312✔
144
        Some(_) => {
145
            // Hit greater or equal key, get previous.
146
            cursor_get(cursor, MDB_PREV_NODUP)
1,158✔
147
        }
148
        // All key less than given key, get last.
149
        None => cursor_get(cursor, MDB_LAST),
1,154✔
150
    }
151
}
2,312✔
152

153
#[cfg(test)]
154
mod tests {
155
    use lmdb::{Database, DatabaseFlags, Transaction, WriteFlags};
156
    use tempdir::TempDir;
157

158
    use crate::lmdb_storage::{
159
        CreateDatabase, LmdbEnvironmentManager, LmdbEnvironmentOptions, SharedTransaction,
160
    };
161

162
    use super::*;
×
163

×
164
    fn test_database() -> (TempDir, SharedTransaction, Database) {
5✔
165
        let temp_dir = TempDir::new("test_database").unwrap();
5✔
166
        let mut env = LmdbEnvironmentManager::create(
5✔
167
            temp_dir.path(),
5✔
168
            "test_database",
5✔
169
            LmdbEnvironmentOptions::default(),
5✔
170
        )
5✔
171
        .unwrap();
5✔
172
        let db = env
5✔
173
            .create_database(None, Some(DatabaseFlags::DUP_SORT))
5✔
174
            .unwrap();
5✔
175
        let txn = env.create_txn().unwrap();
5✔
176

5✔
177
        (temp_dir, txn, db)
5✔
178
    }
5✔
179

×
180
    fn insert_test_data(txn: &SharedTransaction, db: Database) {
5✔
181
        let mut txn = txn.write();
5✔
182
        for key in [b"1", b"3", b"5"] {
15✔
183
            txn.txn_mut()
15✔
184
                .put(db, key, &[], WriteFlags::empty())
15✔
185
                .unwrap();
15✔
186
        }
15✔
187
        txn.commit_and_renew().unwrap();
5✔
188
    }
5✔
189

×
190
    #[test]
1✔
191
    fn test_cursor_get_greater_than_or_equal_to() {
1✔
192
        let (_temp_dir, txn, db) = test_database();
1✔
193

1✔
194
        // Empty database.
1✔
195
        {
1✔
196
            let txn = txn.read();
1✔
197
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
198
            assert_eq!(
1✔
199
                cursor_get_greater_than_or_equal_to(&cursor, b"0").unwrap(),
1✔
200
                None
1✔
201
            );
1✔
202
        }
203

×
204
        // Non-empty database.
×
205
        insert_test_data(&txn, db);
1✔
206
        let txn = txn.read();
1✔
207
        let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
208
        // Before db start.
1✔
209
        assert_eq!(
1✔
210
            cursor_get_greater_than_or_equal_to(&cursor, b"0")
1✔
211
                .unwrap()
1✔
212
                .unwrap()
1✔
213
                .0,
1✔
214
            b"1"
1✔
215
        );
1✔
216
        // Equal.
×
217
        assert_eq!(
1✔
218
            cursor_get_greater_than_or_equal_to(&cursor, b"3")
1✔
219
                .unwrap()
1✔
220
                .unwrap()
1✔
221
                .0,
1✔
222
            b"3"
1✔
223
        );
1✔
224
        // Greater than.
×
225
        assert_eq!(
1✔
226
            cursor_get_greater_than_or_equal_to(&cursor, b"4")
1✔
227
                .unwrap()
1✔
228
                .unwrap()
1✔
229
                .0,
1✔
230
            b"5"
1✔
231
        );
1✔
232
        // Past db end.
×
233
        assert_eq!(
1✔
234
            cursor_get_greater_than_or_equal_to(&cursor, b"6").unwrap(),
1✔
235
            None
1✔
236
        );
1✔
237
    }
1✔
238

×
239
    #[test]
1✔
240
    fn test_cursor_greater_than() {
1✔
241
        let (_temp_dir, txn, db) = test_database();
1✔
242

1✔
243
        // Empty database.
1✔
244
        {
1✔
245
            let txn = txn.read();
1✔
246
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
247
            assert_eq!(cursor_get_greater_than(&cursor, b"0").unwrap(), None);
1✔
248
        }
249

×
250
        // Non-empty database.
×
251
        insert_test_data(&txn, db);
1✔
252
        let txn = txn.read();
1✔
253
        let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
254
        // Before db start.
1✔
255
        assert_eq!(
1✔
256
            cursor_get_greater_than(&cursor, b"0").unwrap().unwrap().0,
1✔
257
            b"1"
1✔
258
        );
1✔
259
        // Equal element should be skipped.
×
260
        assert_eq!(
1✔
261
            cursor_get_greater_than(&cursor, b"3").unwrap().unwrap().0,
1✔
262
            b"5"
1✔
263
        );
1✔
264
        // Greater than.
×
265
        assert_eq!(
1✔
266
            cursor_get_greater_than(&cursor, b"4").unwrap().unwrap().0,
1✔
267
            b"5"
1✔
268
        );
1✔
269
        // Past db end.
×
270
        assert_eq!(cursor_get_greater_than(&cursor, b"6").unwrap(), None);
1✔
271
    }
1✔
272

×
273
    #[test]
1✔
274
    fn test_cursor_get_less_than_or_equal_to() {
1✔
275
        let (_temp_dir, txn, db) = test_database();
1✔
276

1✔
277
        // Empty database.
1✔
278
        {
1✔
279
            let txn = txn.read();
1✔
280
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
281
            assert_eq!(
1✔
282
                cursor_get_less_than_or_equal_to(&cursor, b"6").unwrap(),
1✔
283
                None
1✔
284
            );
1✔
285
        }
286

×
287
        // Non-empty database.
×
288
        insert_test_data(&txn, db);
1✔
289
        let txn = txn.read();
1✔
290
        let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
291
        // Before db start.
1✔
292
        assert_eq!(
1✔
293
            cursor_get_less_than_or_equal_to(&cursor, b"0").unwrap(),
1✔
294
            None
1✔
295
        );
1✔
296
        // Equal.
×
297
        assert_eq!(
1✔
298
            cursor_get_less_than_or_equal_to(&cursor, b"3")
1✔
299
                .unwrap()
1✔
300
                .unwrap()
1✔
301
                .0,
1✔
302
            b"3"
1✔
303
        );
1✔
304
        // Less than.
×
305
        assert_eq!(
1✔
306
            cursor_get_less_than_or_equal_to(&cursor, b"4")
1✔
307
                .unwrap()
1✔
308
                .unwrap()
1✔
309
                .0,
1✔
310
            b"3"
1✔
311
        );
1✔
312
        // Past db end.
×
313
        assert_eq!(
1✔
314
            cursor_get_less_than_or_equal_to(&cursor, b"6")
1✔
315
                .unwrap()
1✔
316
                .unwrap()
1✔
317
                .0,
1✔
318
            b"5"
1✔
319
        );
1✔
320
    }
1✔
321

×
322
    #[test]
1✔
323
    fn test_cursor_less_than() {
1✔
324
        let (_temp_dir, txn, db) = test_database();
1✔
325

1✔
326
        // Empty database.
1✔
327
        {
1✔
328
            let txn = txn.read();
1✔
329
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
330
            assert_eq!(cursor_get_less_than(&cursor, b"6").unwrap(), None);
1✔
331
        }
332

×
333
        // Non-empty database.
×
334
        insert_test_data(&txn, db);
1✔
335
        let txn = txn.read();
1✔
336
        let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
337
        // Before db start.
1✔
338
        assert_eq!(cursor_get_less_than(&cursor, b"0").unwrap(), None);
1✔
339
        // Equal element should be skipped.
×
340
        assert_eq!(
1✔
341
            cursor_get_less_than(&cursor, b"3").unwrap().unwrap().0,
1✔
342
            b"1"
1✔
343
        );
1✔
344
        // Less than.
×
345
        assert_eq!(
1✔
346
            cursor_get_less_than(&cursor, b"4").unwrap().unwrap().0,
1✔
347
            b"3"
1✔
348
        );
1✔
349
        // Past db end.
×
350
        assert_eq!(
1✔
351
            cursor_get_less_than(&cursor, b"6").unwrap().unwrap().0,
1✔
352
            b"5"
1✔
353
        );
1✔
354
    }
1✔
355

×
356
    #[test]
1✔
357
    fn test_raw_iterator() {
1✔
358
        let (_temp_dir, txn, db) = test_database();
1✔
359

1✔
360
        // Empty database.
1✔
361
        {
1✔
362
            let txn = txn.read();
1✔
363
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
364
            let items = RawIterator::new(cursor, Bound::Unbounded, true)
1✔
365
                .unwrap()
1✔
366
                .collect::<Result<Vec<_>, _>>()
1✔
367
                .unwrap();
1✔
368
            assert_eq!(items, vec![]);
1✔
369
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
370
            let items = RawIterator::new(cursor, Bound::Unbounded, false)
1✔
371
                .unwrap()
1✔
372
                .collect::<Result<Vec<_>, _>>()
1✔
373
                .unwrap();
1✔
374
            assert_eq!(items, vec![]);
1✔
375
        }
376

×
377
        // Non-empty database.
×
378
        insert_test_data(&txn, db);
1✔
379
        let txn = txn.read();
1✔
380
        // Included ascending
1✔
381
        {
1✔
382
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
383
            let items = RawIterator::new(cursor, Bound::Included(b"3"), true)
1✔
384
                .unwrap()
1✔
385
                .map(|result| result.map(|(key, _)| key))
2✔
386
                .collect::<Result<Vec<_>, _>>()
1✔
387
                .unwrap();
1✔
388
            assert_eq!(items, vec![b"3", b"5"]);
1✔
389
        }
390
        // Excluded ascending
×
391
        {
×
392
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
393
            let items = RawIterator::new(cursor, Bound::Excluded(b"3"), true)
1✔
394
                .unwrap()
1✔
395
                .map(|result| result.map(|(key, _)| key))
1✔
396
                .collect::<Result<Vec<_>, _>>()
1✔
397
                .unwrap();
1✔
398
            assert_eq!(items, vec![b"5"]);
1✔
399
        }
400
        // Unbounded ascending
×
401
        {
×
402
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
403
            let items = RawIterator::new(cursor, Bound::Unbounded, true)
1✔
404
                .unwrap()
1✔
405
                .map(|result| result.map(|(key, _)| key))
3✔
406
                .collect::<Result<Vec<_>, _>>()
1✔
407
                .unwrap();
1✔
408
            assert_eq!(items, vec![b"1", b"3", b"5"]);
1✔
409
        }
410
        // Included descending
×
411
        {
×
412
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
413
            let items = RawIterator::new(cursor, Bound::Included(b"3"), false)
1✔
414
                .unwrap()
1✔
415
                .map(|result| result.map(|(key, _)| key))
2✔
416
                .collect::<Result<Vec<_>, _>>()
1✔
417
                .unwrap();
1✔
418
            assert_eq!(items, vec![b"3", b"1"]);
1✔
419
        }
420
        // Excluded descending
×
421
        {
×
422
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
423
            let items = RawIterator::new(cursor, Bound::Excluded(b"3"), false)
1✔
424
                .unwrap()
1✔
425
                .map(|result| result.map(|(key, _)| key))
1✔
426
                .collect::<Result<Vec<_>, _>>()
1✔
427
                .unwrap();
1✔
428
            assert_eq!(items, vec![b"1"]);
1✔
429
        }
430
        // Unbounded descending
×
431
        {
×
432
            let cursor = txn.txn().open_ro_cursor(db).unwrap();
1✔
433
            let items = RawIterator::new(cursor, Bound::Unbounded, false)
1✔
434
                .unwrap()
1✔
435
                .map(|result| result.map(|(key, _)| key))
3✔
436
                .collect::<Result<Vec<_>, _>>()
1✔
437
                .unwrap();
1✔
438
            assert_eq!(items, vec![b"5", b"3", b"1"]);
1✔
439
        }
440
    }
1✔
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc