• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.25
/dozer-cache/src/cache/lmdb/cache/query/handler.rs
1
use super::intersection::intersection;
2
use crate::cache::expression::Skip;
3
use crate::cache::lmdb::cache::main_environment::MainEnvironment;
4
use crate::cache::lmdb::cache::query::secondary::build_index_scan;
5
use crate::cache::lmdb::cache::LmdbCache;
6
use crate::cache::RecordWithId;
7
use crate::cache::{
8
    expression::QueryExpression,
9
    plan::{IndexScan, Plan, QueryPlanner},
10
};
11
use crate::errors::{CacheError, PlanError};
12
use dozer_storage::errors::StorageError;
13
use dozer_storage::lmdb::Transaction;
14
use dozer_storage::BeginTransaction;
15
use dozer_types::borrow::IntoOwned;
16
use itertools::Either;
17

18
pub struct LmdbQueryHandler<'a, C: LmdbCache> {
19
    cache: &'a C,
20
    query: &'a QueryExpression,
21
}
22

23
impl<'a, C: LmdbCache> LmdbQueryHandler<'a, C> {
24
    pub fn new(cache: &'a C, query: &'a QueryExpression) -> Self {
4,473✔
25
        Self { cache, query }
4,473✔
26
    }
4,473✔
27

×
28
    pub fn count(&self) -> Result<usize, CacheError> {
×
29
        match self.plan()? {
2,229✔
30
            Plan::IndexScans(index_scans) => {
2,085✔
31
                let secondary_txns = self.create_secondary_txns(&index_scans)?;
2,085✔
32
                let ids = self.combine_secondary_queries(&index_scans, &secondary_txns)?;
2,085✔
33
                self.count_secondary_queries(ids)
2,085✔
34
            }
×
35
            Plan::SeqScan(_) => Ok(match self.query.skip {
142✔
36
                Skip::Skip(skip) => self
136✔
37
                    .cache
136✔
38
                    .main_env()
136✔
39
                    .count()?
136✔
40
                    .saturating_sub(skip)
136✔
41
                    .min(self.query.limit.unwrap_or(usize::MAX)),
136✔
42
                Skip::After(_) => self.all_ids(&self.cache.main_env().begin_txn()?)?.count(),
6✔
43
            }),
×
44
            Plan::ReturnEmpty => Ok(0),
1✔
45
        }
×
46
    }
2,229✔
47

×
48
    pub fn query(&self) -> Result<Vec<RecordWithId>, CacheError> {
×
49
        match self.plan()? {
2,244✔
50
            Plan::IndexScans(index_scans) => {
2,088✔
51
                let secondary_txns = self.create_secondary_txns(&index_scans)?;
2,088✔
52
                let main_txn = self.cache.main_env().begin_txn()?;
2,088✔
53
                #[allow(clippy::let_and_return)] // Must do let binding unless won't compile
×
54
                let result = self.collect_records(
2,088✔
55
                    &main_txn,
2,088✔
56
                    self.combine_secondary_queries(&index_scans, &secondary_txns)?,
2,088✔
57
                );
×
58
                result
2,088✔
59
            }
×
60
            Plan::SeqScan(_seq_scan) => {
154✔
61
                let main_txn = self.cache.main_env().begin_txn()?;
154✔
62
                #[allow(clippy::let_and_return)] // Must do let binding unless won't compile
×
63
                let result = self.collect_records(&main_txn, self.all_ids(&main_txn)?);
154✔
64
                result
154✔
65
            }
×
66
            Plan::ReturnEmpty => Ok(vec![]),
1✔
67
        }
×
68
    }
2,244✔
69

×
70
    fn plan(&self) -> Result<Plan, PlanError> {
4,473✔
71
        let (schema, secondary_indexes) = self.cache.main_env().schema();
4,473✔
72
        let planner = QueryPlanner::new(
4,473✔
73
            schema,
4,473✔
74
            secondary_indexes,
4,473✔
75
            self.query.filter.as_ref(),
4,473✔
76
            &self.query.order_by,
4,473✔
77
        );
4,473✔
78
        planner.plan()
4,473✔
79
    }
4,473✔
80

×
81
    fn all_ids<'txn, T: Transaction>(
160✔
82
        &self,
160✔
83
        main_txn: &'txn T,
160✔
84
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + 'txn, CacheError> {
160✔
85
        let schema_is_append_only = self.cache.main_env().schema().0.is_append_only();
160✔
86
        let all_ids = self
160✔
87
            .cache
160✔
88
            .main_env()
160✔
89
            .operation_log()
160✔
90
            .present_operation_ids(main_txn, schema_is_append_only)?
160✔
91
            .map(|result| {
58,222✔
92
                result
58,222✔
93
                    .map(|id| id.into_owned())
58,222✔
94
                    .map_err(CacheError::Storage)
58,222✔
95
            });
58,222✔
96
        Ok(skip(all_ids, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
160✔
97
    }
160✔
98

×
99
    fn create_secondary_txns(
4,173✔
100
        &self,
4,173✔
101
        index_scans: &[IndexScan],
4,173✔
102
    ) -> Result<Vec<<C::SecondaryEnvironment as BeginTransaction>::Transaction<'_>>, StorageError>
4,173✔
103
    {
4,173✔
104
        index_scans
4,173✔
105
            .iter()
4,173✔
106
            .map(|index_scan| self.cache.secondary_env(index_scan.index_id).begin_txn())
4,175✔
107
            .collect()
4,173✔
108
    }
4,173✔
109

×
110
    fn combine_secondary_queries<'txn, T: Transaction>(
4,173✔
111
        &self,
4,173✔
112
        index_scans: &[IndexScan],
4,173✔
113
        secondary_txns: &'txn [T],
4,173✔
114
    ) -> Result<impl Iterator<Item = Result<u64, CacheError>> + 'txn, CacheError> {
4,173✔
115
        debug_assert!(
×
116
            !index_scans.is_empty(),
4,173✔
117
            "Planner should not generate empty index scan"
×
118
        );
×
119
        let combined = if index_scans.len() == 1 {
4,173✔
120
            // The fast path, without intersection calculation.
×
121
            Either::Left(build_index_scan(
4,171✔
122
                &secondary_txns[0],
4,171✔
123
                self.cache.secondary_env(index_scans[0].index_id),
4,171✔
124
                &index_scans[0].kind,
4,171✔
125
            )?)
4,171✔
126
        } else {
×
127
            // Intersection of multiple index scans.
×
128
            let iterators = index_scans
2✔
129
                .iter()
2✔
130
                .zip(secondary_txns)
2✔
131
                .map(|(index_scan, secondary_txn)| {
4✔
132
                    build_index_scan(
4✔
133
                        secondary_txn,
4✔
134
                        self.cache.secondary_env(index_scan.index_id),
4✔
135
                        &index_scan.kind,
4✔
136
                    )
4✔
137
                })
4✔
138
                .collect::<Result<Vec<_>, CacheError>>()?;
2✔
139
            Either::Right(intersection(
2✔
140
                iterators,
2✔
141
                self.cache.main_env().intersection_chunk_size(),
2✔
142
            ))
2✔
143
        };
×
144
        Ok(skip(combined, self.query.skip).take(self.query.limit.unwrap_or(usize::MAX)))
4,173✔
145
    }
4,173✔
146

×
147
    fn filter_secondary_queries<'txn, T: Transaction>(
4,327✔
148
        &'txn self,
4,327✔
149
        main_txn: &'txn T,
4,327✔
150
        ids: impl Iterator<Item = Result<u64, CacheError>> + 'txn,
4,327✔
151
    ) -> impl Iterator<Item = Result<u64, CacheError>> + 'txn {
4,327✔
152
        let schema_is_append_only = self.cache.main_env().schema().0.is_append_only();
4,327✔
153
        ids.filter_map(move |id| match id {
2,196,235✔
154
            Ok(id) => match self.cache.main_env().operation_log().contains_operation_id(
2,196,235✔
155
                main_txn,
2,196,235✔
156
                schema_is_append_only,
2,196,235✔
157
                id,
2,196,235✔
158
            ) {
2,196,235✔
159
                Ok(true) => Some(Ok(id)),
2,196,235✔
160
                Ok(false) => None,
×
161
                Err(err) => Some(Err(err.into())),
×
162
            },
×
163
            Err(err) => Some(Err(err)),
×
164
        })
2,196,235✔
165
    }
4,327✔
166

×
167
    fn count_secondary_queries(
2,085✔
168
        &self,
2,085✔
169
        ids: impl Iterator<Item = Result<u64, CacheError>>,
2,085✔
170
    ) -> Result<usize, CacheError> {
2,085✔
171
        let main_txn = self.cache.main_env().begin_txn()?;
2,085✔
172

×
173
        let mut result = 0;
2,085✔
174
        for maybe_id in self.filter_secondary_queries(&main_txn, ids) {
1,069,614✔
175
            maybe_id?;
1,069,614✔
176
            result += 1;
1,069,614✔
177
        }
×
178
        Ok(result)
2,085✔
179
    }
2,085✔
180

×
181
    fn collect_records<'txn, T: Transaction>(
2,242✔
182
        &'txn self,
2,242✔
183
        main_txn: &'txn T,
2,242✔
184
        ids: impl Iterator<Item = Result<u64, CacheError>> + 'txn,
2,242✔
185
    ) -> Result<Vec<RecordWithId>, CacheError> {
2,242✔
186
        self.filter_secondary_queries(main_txn, ids)
2,242✔
187
            .map(|id| {
1,126,621✔
188
                id.and_then(|id| {
1,126,621✔
189
                    self.cache
1,126,621✔
190
                        .main_env()
1,126,621✔
191
                        .operation_log()
1,126,621✔
192
                        .get_record_by_operation_id_unchecked(main_txn, id)
1,126,621✔
193
                        .map_err(Into::into)
1,126,621✔
194
                })
1,126,621✔
195
            })
1,126,621✔
196
            .collect()
2,242✔
197
    }
2,242✔
198
}
×
199

×
200
fn skip(
4,333✔
201
    iter: impl Iterator<Item = Result<u64, CacheError>>,
4,333✔
202
    skip: Skip,
4,333✔
203
) -> impl Iterator<Item = Result<u64, CacheError>> {
4,333✔
204
    match skip {
4,333✔
205
        Skip::Skip(n) => Either::Left(iter.skip(n)),
3,781✔
206
        Skip::After(after) => Either::Right(skip_after(iter, after)),
552✔
207
    }
×
208
}
4,333✔
209

210
struct SkipAfter<T: Iterator<Item = Result<u64, CacheError>>> {
×
211
    inner: T,
×
212
    after: Option<u64>,
×
213
}
×
214

×
215
impl<T: Iterator<Item = Result<u64, CacheError>>> Iterator for SkipAfter<T> {
×
216
    type Item = Result<u64, CacheError>;
×
217

×
218
    fn next(&mut self) -> Option<Self::Item> {
178,488✔
219
        loop {
×
220
            if let Some(after) = self.after {
334,044✔
221
                match self.inner.next() {
155,628✔
222
                    Some(Ok(id)) => {
155,556✔
223
                        if id == after {
155,556✔
224
                            self.after = None;
480✔
225
                        }
155,076✔
226
                    }
×
227
                    Some(Err(e)) => return Some(Err(e)),
×
228
                    None => return None,
72✔
229
                }
×
230
            } else {
×
231
                return self.inner.next();
178,416✔
232
            }
×
233
        }
×
234
    }
178,488✔
235
}
×
236

×
237
fn skip_after<T: Iterator<Item = Result<u64, CacheError>>>(iter: T, after: u64) -> SkipAfter<T> {
552✔
238
    SkipAfter {
552✔
239
        inner: iter,
552✔
240
        after: Some(after),
552✔
241
    }
552✔
242
}
552✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc