• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4116183752

pending completion
4116183752

push

github

GitHub
refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync` (#821)

790 of 790 new or added lines in 44 files covered. (100.0%)

23005 of 33842 relevant lines covered (67.98%)

56312.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.12
/dozer-cache/src/cache/lmdb/query/iterator.rs
1
use std::marker::PhantomData;
2

3
use dozer_storage::lmdb::Cursor;
4
use dozer_storage::lmdb_sys::{
5
    MDB_FIRST, MDB_LAST, MDB_NEXT, MDB_NEXT_NODUP, MDB_PREV, MDB_PREV_NODUP, MDB_SET_RANGE,
6
};
7

8
use crate::cache::expression::SortDirection;
9

10
#[derive(Debug, Clone)]
×
11
pub enum KeyEndpoint {
12
    Including(Vec<u8>),
13
    Excluding(Vec<u8>),
14
}
15

16
impl KeyEndpoint {
17
    pub fn key(&self) -> &[u8] {
799,888✔
18
        match self {
799,888✔
19
            KeyEndpoint::Including(key) => key,
386,390✔
20
            KeyEndpoint::Excluding(key) => key,
413,498✔
21
        }
22
    }
799,888✔
23
}
24

25
enum CacheIteratorState {
26
    First {
27
        starting_key: Option<KeyEndpoint>,
28
        direction: SortDirection,
29
    },
30
    NotFirst {
31
        direction: SortDirection,
32
    },
33
}
34

35
pub struct CacheIterator<'txn, C: Cursor<'txn>> {
36
    cursor: C,
37
    state: CacheIteratorState,
38
    _marker: PhantomData<fn() -> &'txn ()>,
39
}
40

41
impl<'txn, C: Cursor<'txn>> Iterator for CacheIterator<'txn, C> {
42
    type Item = (&'txn [u8], &'txn [u8]);
43

44
    fn next(&mut self) -> Option<Self::Item> {
253,112✔
45
        let res = match &self.state {
253,112✔
46
            CacheIteratorState::First {
47
                starting_key,
538✔
48
                direction,
538✔
49
            } => {
50
                let res = match starting_key {
538✔
51
                    Some(starting_key) => match direction {
436✔
52
                        SortDirection::Ascending => {
53
                            match self
54
                                .cursor
55
                                .get(Some(starting_key.key()), None, MDB_SET_RANGE)
269✔
56
                            {
57
                                Ok((key, value)) => {
266✔
58
                                    if key == Some(starting_key.key())
266✔
59
                                        && matches!(starting_key, KeyEndpoint::Excluding(_))
139✔
60
                                    {
61
                                        self.cursor.get(None, None, MDB_NEXT_NODUP)
7✔
62
                                    } else {
63
                                        Ok((key, value))
259✔
64
                                    }
65
                                }
66
                                Err(dozer_storage::lmdb::Error::NotFound) => {
67
                                    return None;
3✔
68
                                }
69

70
                                Err(e) => Err(e),
×
71
                            }
72
                        }
73
                        SortDirection::Descending => {
74
                            match self
75
                                .cursor
76
                                .get(Some(starting_key.key()), None, MDB_SET_RANGE)
167✔
77
                            {
78
                                Ok((key, value)) => {
66✔
79
                                    if key == Some(starting_key.key())
66✔
80
                                        && matches!(starting_key, KeyEndpoint::Including(_))
44✔
81
                                    {
82
                                        Ok((key, value))
1✔
83
                                    } else {
84
                                        self.cursor.get(None, None, MDB_PREV_NODUP)
65✔
85
                                    }
86
                                }
87
                                Err(dozer_storage::lmdb::Error::NotFound) => {
×
88
                                    self.cursor.get(None, None, MDB_LAST)
101✔
89
                                }
90
                                Err(e) => Err(e),
×
91
                            }
92
                        }
×
93
                    },
×
94
                    None => match direction {
102✔
95
                        SortDirection::Ascending => self.cursor.get(None, None, MDB_FIRST),
83✔
96
                        SortDirection::Descending => self.cursor.get(None, None, MDB_LAST),
19✔
97
                    },
×
98
                };
×
99
                self.state = CacheIteratorState::NotFirst {
535✔
100
                    direction: *direction,
535✔
101
                };
535✔
102
                res
535✔
103
            }
×
104
            CacheIteratorState::NotFirst { direction } => match direction {
252,574✔
105
                SortDirection::Ascending => self.cursor.get(None, None, MDB_NEXT),
163,547✔
106
                SortDirection::Descending => self.cursor.get(None, None, MDB_PREV),
89,027✔
107
            },
108
        };
×
109

×
110
        match res {
253,109✔
111
            Ok((key, val)) => key.map(|key| (key, val)),
252,862✔
112
            Err(_e) => None,
247✔
113
        }
114
    }
253,112✔
115
}
×
116
impl<'txn, C: Cursor<'txn>> CacheIterator<'txn, C> {
×
117
    pub fn new(cursor: C, starting_key: Option<KeyEndpoint>, direction: SortDirection) -> Self {
538✔
118
        CacheIterator {
538✔
119
            cursor,
538✔
120
            state: CacheIteratorState::First {
538✔
121
                starting_key,
538✔
122
                direction,
538✔
123
            },
538✔
124
            _marker: PhantomData::default(),
538✔
125
        }
538✔
126
    }
538✔
127
}
128

129
#[cfg(test)]
130
mod tests {
131
    use dozer_storage::lmdb::{DatabaseFlags, Transaction, WriteFlags};
132

133
    use crate::cache::{
134
        expression::SortDirection,
135
        lmdb::{utils::init_env, CacheOptions},
136
    };
137

138
    use super::{CacheIterator, KeyEndpoint};
139

140
    #[test]
1✔
141
    fn test_cache_iterator() {
1✔
142
        let options = CacheOptions::default();
1✔
143
        let mut env = init_env(&options).unwrap();
1✔
144
        let db = env
1✔
145
            .create_database(None, Some(DatabaseFlags::DUP_SORT))
1✔
146
            .unwrap();
1✔
147
        let txn = env.create_txn().unwrap();
1✔
148
        let mut txn = txn.write();
1✔
149

1✔
150
        // Insert test data.
1✔
151
        let txn = txn.txn_mut();
1✔
152
        for key in [
9✔
153
            b"aa", b"ab", b"ac", b"ba", b"bb", b"bc", b"ca", b"cb", b"cc",
1✔
154
        ] {
9✔
155
            txn.put(db, key, &[], WriteFlags::empty()).unwrap();
9✔
156
        }
9✔
157

×
158
        // Create testing cursor and utility function.
×
159
        let check = |starting_key, direction, expected: Vec<&'static [u8]>| {
13✔
160
            let cursor = txn.open_ro_cursor(db).unwrap();
13✔
161
            let actual = CacheIterator::new(cursor, starting_key, direction)
13✔
162
                .map(|(key, _)| key)
79✔
163
                .collect::<Vec<_>>();
13✔
164
            assert_eq!(actual, expected);
13✔
165
        };
13✔
166

×
167
        // Test ascending from start.
×
168
        check(
1✔
169
            None,
1✔
170
            SortDirection::Ascending,
1✔
171
            vec![
1✔
172
                b"aa", b"ab", b"ac", b"ba", b"bb", b"bc", b"ca", b"cb", b"cc",
1✔
173
            ],
1✔
174
        );
1✔
175

1✔
176
        // Test descending from last.
1✔
177
        check(
1✔
178
            None,
1✔
179
            SortDirection::Descending,
1✔
180
            vec![
1✔
181
                b"cc", b"cb", b"ca", b"bc", b"bb", b"ba", b"ac", b"ab", b"aa",
1✔
182
            ],
1✔
183
        );
1✔
184

1✔
185
        // Test ascending from key before db start.
1✔
186
        let starting_key = b"a".to_vec();
1✔
187
        check(
1✔
188
            Some(KeyEndpoint::Excluding(starting_key)),
1✔
189
            SortDirection::Ascending,
1✔
190
            vec![
1✔
191
                b"aa", b"ab", b"ac", b"ba", b"bb", b"bc", b"ca", b"cb", b"cc",
1✔
192
            ],
1✔
193
        );
1✔
194

1✔
195
        // Test descending from key before db start.
1✔
196
        let starting_key = b"a".to_vec();
1✔
197
        check(
1✔
198
            Some(KeyEndpoint::Excluding(starting_key)),
1✔
199
            SortDirection::Descending,
1✔
200
            vec![],
1✔
201
        );
1✔
202

1✔
203
        // Test ascending from existing key.
1✔
204
        let starting_key = b"ba".to_vec();
1✔
205
        check(
1✔
206
            Some(KeyEndpoint::Including(starting_key.clone())),
1✔
207
            SortDirection::Ascending,
1✔
208
            vec![b"ba", b"bb", b"bc", b"ca", b"cb", b"cc"],
1✔
209
        );
1✔
210
        check(
1✔
211
            Some(KeyEndpoint::Excluding(starting_key)),
1✔
212
            SortDirection::Ascending,
1✔
213
            vec![b"bb", b"bc", b"ca", b"cb", b"cc"],
1✔
214
        );
1✔
215

1✔
216
        // Test ascending from non existing key.
1✔
217
        let starting_key = b"00".to_vec();
1✔
218
        check(
1✔
219
            Some(KeyEndpoint::Including(starting_key)),
1✔
220
            SortDirection::Ascending,
1✔
221
            vec![
1✔
222
                b"aa", b"ab", b"ac", b"ba", b"bb", b"bc", b"ca", b"cb", b"cc",
1✔
223
            ],
1✔
224
        );
1✔
225

1✔
226
        // Test descending from existing key.
1✔
227
        let starting_key = b"bc".to_vec();
1✔
228
        check(
1✔
229
            Some(KeyEndpoint::Including(starting_key.clone())),
1✔
230
            SortDirection::Descending,
1✔
231
            vec![b"bc", b"bb", b"ba", b"ac", b"ab", b"aa"],
1✔
232
        );
1✔
233
        check(
1✔
234
            Some(KeyEndpoint::Excluding(starting_key)),
1✔
235
            SortDirection::Descending,
1✔
236
            vec![b"bb", b"ba", b"ac", b"ab", b"aa"],
1✔
237
        );
1✔
238

1✔
239
        // Test ascending from non-existing key.
1✔
240
        let starting_key = b"ad".to_vec();
1✔
241
        check(
1✔
242
            Some(KeyEndpoint::Including(starting_key)),
1✔
243
            SortDirection::Ascending,
1✔
244
            vec![b"ba", b"bb", b"bc", b"ca", b"cb", b"cc"],
1✔
245
        );
1✔
246

1✔
247
        // Test descending from non-existing key.
1✔
248
        let starting_key = b"bd".to_vec();
1✔
249
        check(
1✔
250
            Some(KeyEndpoint::Including(starting_key)),
1✔
251
            SortDirection::Descending,
1✔
252
            vec![b"bc", b"bb", b"ba", b"ac", b"ab", b"aa"],
1✔
253
        );
1✔
254

1✔
255
        // Test descending from key past db end.
1✔
256
        let starting_key = b"dd".to_vec();
1✔
257
        check(
1✔
258
            Some(KeyEndpoint::Including(starting_key)),
1✔
259
            SortDirection::Descending,
1✔
260
            vec![
1✔
261
                b"cc", b"cb", b"ca", b"bc", b"bb", b"ba", b"ac", b"ab", b"aa",
1✔
262
            ],
1✔
263
        );
1✔
264

1✔
265
        // Test ascending from key past db end.
1✔
266
        let starting_key = b"dd".to_vec();
1✔
267
        check(
1✔
268
            Some(KeyEndpoint::Including(starting_key)),
1✔
269
            SortDirection::Ascending,
1✔
270
            vec![],
1✔
271
        );
1✔
272
    }
1✔
273
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc