• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4392484403

pending completion
4392484403

push

github

GitHub
feat: Asynchoronous indexing (#1206)

270 of 270 new or added lines in 13 files covered. (100.0%)

28714 of 38777 relevant lines covered (74.05%)

89484.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.19
/dozer-cache/src/cache/lmdb/cache/mod.rs
1
use std::path::PathBuf;
2
use std::{fmt::Debug, sync::Arc};
3

4
use dozer_types::types::{Record, SchemaWithIndex};
5

6
use super::{
7
    super::{RoCache, RwCache},
8
    indexing::IndexingThreadPool,
9
};
10
use crate::cache::expression::QueryExpression;
11
use crate::cache::RecordWithId;
12
use crate::errors::CacheError;
13

14
mod main_environment;
15
mod query;
16
mod secondary_environment;
17

18
pub use main_environment::{MainEnvironment, RoMainEnvironment, RwMainEnvironment};
19
use query::LmdbQueryHandler;
20
pub use secondary_environment::{
21
    RoSecondaryEnvironment, RwSecondaryEnvironment, SecondaryEnvironment,
22
};
23

×
24
#[derive(Clone, Debug)]
×
25
pub struct CacheCommonOptions {
26
    // Total number of readers allowed
27
    pub max_readers: u32,
28
    // Max no of dbs
29
    pub max_db_size: u32,
30

31
    /// The chunk size when calculating intersection of index queries.
32
    pub intersection_chunk_size: usize,
33

34
    /// Provide a path where db will be created. If nothing is provided, will default to a temp location.
35
    /// Db path will be `PathBuf.join(String)`.
36
    pub path: Option<(PathBuf, String)>,
37
}
38

39
impl Default for CacheCommonOptions {
×
40
    fn default() -> Self {
181✔
41
        Self {
181✔
42
            max_readers: 1000,
181✔
43
            max_db_size: 1000,
181✔
44
            intersection_chunk_size: 100,
181✔
45
            path: None,
181✔
46
        }
181✔
47
    }
181✔
48
}
49

×
50
#[derive(Debug)]
×
51
pub struct LmdbRoCache {
52
    main_env: RoMainEnvironment,
53
    secondary_envs: Vec<RoSecondaryEnvironment>,
54
}
55

56
impl LmdbRoCache {
×
57
    pub fn new(options: &CacheCommonOptions) -> Result<Self, CacheError> {
143✔
58
        let main_env = RoMainEnvironment::new(options)?;
143✔
59
        let secondary_envs = (0..main_env.schema().1.len())
143✔
60
            .map(|index| RoSecondaryEnvironment::new(secondary_environment_name(index), options))
695✔
61
            .collect::<Result<_, _>>()?;
143✔
62
        Ok(Self {
143✔
63
            main_env,
143✔
64
            secondary_envs,
143✔
65
        })
143✔
66
    }
143✔
67
}
68

×
69
#[derive(Debug, Clone, Copy)]
×
70
pub struct CacheWriteOptions {
71
    // Total size allocated for data in a memory mapped file.
72
    // This size is allocated at initialization.
73
    pub max_size: usize,
74
}
75

76
impl Default for CacheWriteOptions {
×
77
    fn default() -> Self {
180✔
78
        Self {
180✔
79
            max_size: 1024 * 1024 * 1024,
180✔
80
        }
180✔
81
    }
180✔
82
}
83

×
84
#[derive(Debug)]
×
85
pub struct LmdbRwCache {
86
    main_env: RwMainEnvironment,
87
    secondary_envs: Vec<RoSecondaryEnvironment>,
88
}
89

90
impl LmdbRwCache {
×
91
    pub fn new(
156✔
92
        schema: Option<&SchemaWithIndex>,
156✔
93
        common_options: &CacheCommonOptions,
156✔
94
        write_options: CacheWriteOptions,
156✔
95
        indexing_thread_pool: &mut IndexingThreadPool,
156✔
96
    ) -> Result<Self, CacheError> {
156✔
97
        let rw_main_env = RwMainEnvironment::new(schema, common_options, write_options)?;
156✔
98

×
99
        let common_options = CacheCommonOptions {
156✔
100
            path: Some((
156✔
101
                rw_main_env.base_path().to_path_buf(),
156✔
102
                rw_main_env.name().to_string(),
156✔
103
            )),
156✔
104
            ..*common_options
156✔
105
        };
156✔
106
        let ro_main_env = Arc::new(RoMainEnvironment::new(&common_options)?);
156✔
107

×
108
        let mut ro_secondary_envs = vec![];
156✔
109
        for (index, index_definition) in ro_main_env.schema().1.iter().enumerate() {
717✔
110
            let name = secondary_environment_name(index);
717✔
111
            let rw_secondary_env = RwSecondaryEnvironment::new(
717✔
112
                Some(index_definition),
717✔
113
                name.clone(),
717✔
114
                &common_options,
717✔
115
                write_options,
717✔
116
            )?;
717✔
117
            indexing_thread_pool.add_indexing_task(ro_main_env.clone(), Arc::new(rw_secondary_env));
717✔
118

×
119
            let ro_secondary_env = RoSecondaryEnvironment::new(name, &common_options)?;
717✔
120
            ro_secondary_envs.push(ro_secondary_env);
717✔
121
        }
×
122

×
123
        Ok(Self {
156✔
124
            main_env: rw_main_env,
156✔
125
            secondary_envs: ro_secondary_envs,
156✔
126
        })
156✔
127
    }
156✔
128
}
×
129

×
130
impl<C: LmdbCache> RoCache for C {
×
131
    fn name(&self) -> &str {
166✔
132
        self.main_env().name()
166✔
133
    }
166✔
134

×
135
    fn get(&self, key: &[u8]) -> Result<RecordWithId, CacheError> {
30✔
136
        self.main_env().get(key)
30✔
137
    }
30✔
138

×
139
    fn count(&self, query: &QueryExpression) -> Result<usize, CacheError> {
2,229✔
140
        LmdbQueryHandler::new(self, query).count()
2,229✔
141
    }
2,229✔
142

×
143
    fn query(&self, query: &QueryExpression) -> Result<Vec<RecordWithId>, CacheError> {
2,244✔
144
        LmdbQueryHandler::new(self, query).query()
2,244✔
145
    }
2,244✔
146

×
147
    fn get_schema(&self) -> &SchemaWithIndex {
178✔
148
        self.main_env().schema()
178✔
149
    }
178✔
150
}
×
151

×
152
impl RwCache for LmdbRwCache {
×
153
    fn insert(&self, record: &mut Record) -> Result<u64, CacheError> {
12,002✔
154
        let span = dozer_types::tracing::span!(dozer_types::tracing::Level::TRACE, "insert_cache");
23,992✔
155
        let _enter = span.enter();
11,996✔
156
        let record_id = self.main_env.insert(record)?;
11,996✔
157
        Ok(record_id)
11,996✔
158
    }
11,996✔
159

160
    fn delete(&self, key: &[u8]) -> Result<u32, CacheError> {
13✔
161
        let version = self.main_env.delete(key)?;
13✔
162
        Ok(version)
13✔
163
    }
13✔
164

165
    fn update(&self, key: &[u8], record: &mut Record) -> Result<u32, CacheError> {
7✔
166
        let version = self.delete(key)?;
7✔
167
        self.insert(record)?;
7✔
168
        Ok(version)
7✔
169
    }
7✔
170

×
171
    fn commit(&self) -> Result<(), CacheError> {
×
172
        self.main_env.commit()?;
156✔
173
        Ok(())
156✔
174
    }
156✔
175
}
176

×
177
pub trait LmdbCache: Send + Sync + Debug {
×
178
    type MainEnvironment: MainEnvironment;
×
179

×
180
    fn main_env(&self) -> &Self::MainEnvironment;
×
181

182
    type SecondaryEnvironment: SecondaryEnvironment;
×
183

×
184
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment;
×
185
}
×
186

×
187
impl LmdbCache for LmdbRoCache {
×
188
    type MainEnvironment = RoMainEnvironment;
189
    fn main_env(&self) -> &Self::MainEnvironment {
3,336,217✔
190
        &self.main_env
3,336,217✔
191
    }
3,336,217✔
192

×
193
    type SecondaryEnvironment = RoSecondaryEnvironment;
194
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment {
8,258✔
195
        &self.secondary_envs[index]
8,258✔
196
    }
8,258✔
197
}
198

199
impl LmdbCache for LmdbRwCache {
×
200
    type MainEnvironment = RwMainEnvironment;
×
201
    fn main_env(&self) -> &Self::MainEnvironment {
604✔
202
        &self.main_env
604✔
203
    }
604✔
204

×
205
    type SecondaryEnvironment = RoSecondaryEnvironment;
×
206
    fn secondary_env(&self, index: usize) -> &Self::SecondaryEnvironment {
102✔
207
        &self.secondary_envs[index]
102✔
208
    }
102✔
209
}
210

211
fn secondary_environment_name(index: usize) -> String {
1,411✔
212
    format!("{index}")
1,411✔
213
}
1,411✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc