• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4392484403

pending completion
4392484403

push

github

GitHub
feat: Asynchoronous indexing (#1206)

270 of 270 new or added lines in 13 files covered. (100.0%)

28714 of 38777 relevant lines covered (74.05%)

89484.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.86
/dozer-cache/src/cache/lmdb/tests/basic.rs
1
use crate::cache::{
2
    expression::{self, FilterExpression, QueryExpression, Skip},
3
    index,
4
    lmdb::{cache::LmdbRwCache, indexing::IndexingThreadPool},
5
    test_utils::{self, query_from_filter},
6
    RoCache, RwCache,
7
};
8
use dozer_types::{
9
    serde_json::Value,
10
    types::{Field, Record, Schema},
11
};
12

13
use super::utils::create_cache;
14

15
fn _setup() -> (LmdbRwCache, IndexingThreadPool, Schema) {
4✔
16
    let (cache, indexing_thread_pool, schema, _) = create_cache(test_utils::schema_0);
4✔
17
    (cache, indexing_thread_pool, schema)
4✔
18
}
4✔
19

20
fn _setup_empty_primary_index() -> (LmdbRwCache, IndexingThreadPool, Schema) {
1✔
21
    let (cache, indexing_thread_pool, schema, _) =
1✔
22
        create_cache(test_utils::schema_empty_primary_index);
1✔
23
    (cache, indexing_thread_pool, schema)
1✔
24
}
1✔
25

×
26
fn query_and_test(cache: &dyn RwCache, inserted_record: &Record, exp: &QueryExpression) {
4✔
27
    let records = cache.query(exp).unwrap();
4✔
28
    assert_eq!(records[0].record, inserted_record.clone(), "must be equal");
4✔
29
}
4✔
30

×
31
#[test]
1✔
32
fn get_schema() {
1✔
33
    let (cache, _, schema) = _setup();
1✔
34

1✔
35
    let get_schema = &cache.get_schema().0;
1✔
36
    assert_eq!(get_schema, &schema, "must be equal");
1✔
37
}
1✔
38

×
39
#[test]
1✔
40
fn insert_get_and_delete_record() {
1✔
41
    let val = "bar".to_string();
1✔
42
    let (cache, mut indexing_thread_pool, schema) = _setup();
1✔
43

1✔
44
    assert_eq!(cache.count(&QueryExpression::with_no_limit()).unwrap(), 0);
1✔
45

×
46
    let mut record = Record::new(schema.identifier, vec![Field::String(val.clone())], None);
1✔
47
    cache.insert(&mut record).unwrap();
1✔
48
    cache.commit().unwrap();
1✔
49
    indexing_thread_pool.wait_until_catchup();
1✔
50

1✔
51
    assert_eq!(cache.count(&QueryExpression::with_no_limit()).unwrap(), 1);
1✔
52

×
53
    let version = record.version.unwrap();
1✔
54

1✔
55
    let key = index::get_primary_key(&[0], &[Field::String(val)]);
1✔
56

1✔
57
    let get_record = cache.get(&key).unwrap().record;
1✔
58
    assert_eq!(get_record, record, "must be equal");
1✔
59

60
    assert_eq!(cache.delete(&key).unwrap(), version);
1✔
61
    cache.commit().unwrap();
1✔
62
    indexing_thread_pool.wait_until_catchup();
1✔
63

1✔
64
    assert_eq!(cache.count(&QueryExpression::with_no_limit()).unwrap(), 0);
1✔
65

×
66
    cache.get(&key).expect_err("Must not find a record");
1✔
67

1✔
68
    assert_eq!(cache.query(&QueryExpression::default()).unwrap(), vec![]);
1✔
69
}
1✔
70

×
71
#[test]
1✔
72
fn insert_and_update_record() {
1✔
73
    let (cache, _, schema) = _setup();
1✔
74
    let mut foo = Record::new(
1✔
75
        schema.identifier,
1✔
76
        vec![Field::String("foo".to_string())],
1✔
77
        None,
1✔
78
    );
1✔
79
    let mut bar = Record::new(
1✔
80
        schema.identifier,
1✔
81
        vec![Field::String("bar".to_string())],
1✔
82
        None,
1✔
83
    );
1✔
84
    cache.insert(&mut foo).unwrap();
1✔
85
    let old_version = foo.version.unwrap();
1✔
86
    cache.insert(&mut bar).unwrap();
1✔
87

1✔
88
    let key = index::get_primary_key(&schema.primary_index, &foo.values);
1✔
89

1✔
90
    assert_eq!(cache.update(&key, &mut foo).unwrap(), old_version);
1✔
91
    assert_eq!(foo.version.unwrap(), old_version + 1);
1✔
92
}
1✔
93

×
94
fn insert_and_query_record_impl(
2✔
95
    cache: LmdbRwCache,
2✔
96
    mut indexing_thread_pool: IndexingThreadPool,
2✔
97
    schema: Schema,
2✔
98
) {
2✔
99
    let val = "bar".to_string();
2✔
100
    let mut record = Record::new(schema.identifier, vec![Field::String(val)], None);
2✔
101

2✔
102
    cache.insert(&mut record).unwrap();
2✔
103
    cache.commit().unwrap();
2✔
104
    indexing_thread_pool.wait_until_catchup();
2✔
105

2✔
106
    // Query with an expression
2✔
107
    let exp = query_from_filter(FilterExpression::Simple(
2✔
108
        "foo".to_string(),
2✔
109
        expression::Operator::EQ,
2✔
110
        Value::from("bar".to_string()),
2✔
111
    ));
2✔
112

2✔
113
    query_and_test(&cache, &record, &exp);
2✔
114

2✔
115
    // Query without an expression
2✔
116
    query_and_test(
2✔
117
        &cache,
2✔
118
        &record,
2✔
119
        &QueryExpression::new(None, vec![], Some(10), Skip::Skip(0)),
2✔
120
    );
2✔
121
}
2✔
122

123
#[test]
1✔
124
fn insert_and_query_record() {
1✔
125
    let (cache, indexing_thread_pool, schema) = _setup();
1✔
126
    insert_and_query_record_impl(cache, indexing_thread_pool, schema);
1✔
127
    let (cache, indexing_thread_pool, schema) = _setup_empty_primary_index();
1✔
128
    insert_and_query_record_impl(cache, indexing_thread_pool, schema);
1✔
129
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc