• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4116183752

pending completion
4116183752

push

github

GitHub
refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync` (#821)

790 of 790 new or added lines in 44 files covered. (100.0%)

23005 of 33842 relevant lines covered (67.98%)

56312.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.54
/dozer-api/src/api_helper.rs
1
use crate::auth::Access;
2
use crate::errors::{ApiError, AuthError};
3
use crate::generator::oapi::generator::OpenApiGenerator;
4
use crate::PipelineDetails;
5
use dozer_cache::cache::{expression::QueryExpression, index};
6
use dozer_cache::errors::CacheError;
7
use dozer_cache::{AccessFilter, CacheReader};
8
use dozer_types::indexmap::IndexMap;
9
use dozer_types::json_str_to_field;
10
use dozer_types::record_to_map;
11
use dozer_types::serde_json::Value;
12
use dozer_types::types::{Record, Schema};
13
use openapiv3::OpenAPI;
14

15
pub struct ApiHelper<'a> {
16
    details: &'a PipelineDetails,
17
    reader: CacheReader,
18
}
19
impl<'a> ApiHelper<'a> {
20
    pub fn new(
37✔
21
        pipeline_details: &'a PipelineDetails,
37✔
22
        access: Option<Access>,
37✔
23
    ) -> Result<Self, ApiError> {
37✔
24
        let access = access.unwrap_or(Access::All);
37✔
25

26
        // Define Access Filter based on token
27
        let reader_access = match access {
37✔
28
            // No access filter.
29
            Access::All => AccessFilter {
37✔
30
                filter: None,
37✔
31
                fields: vec![],
37✔
32
            },
37✔
33

34
            Access::Custom(mut access_filters) => {
×
35
                if let Some(access_filter) = access_filters.remove(&pipeline_details.schema_name) {
×
36
                    access_filter
×
37
                } else {
38
                    return Err(ApiError::ApiAuthError(AuthError::InvalidToken));
×
39
                }
40
            }
41
        };
42

43
        let reader = CacheReader {
37✔
44
            cache: pipeline_details.cache_endpoint.cache.clone(),
37✔
45
            access: reader_access,
37✔
46
        };
37✔
47
        Ok(Self {
37✔
48
            details: pipeline_details,
37✔
49
            reader,
37✔
50
        })
37✔
51
    }
37✔
52

53
    pub fn generate_oapi3(&self) -> Result<OpenAPI, ApiError> {
×
54
        let schema_name = self.details.schema_name.clone();
×
55
        let (schema, secondary_indexes) = self
×
56
            .reader
×
57
            .get_schema_and_indexes_by_name(&schema_name)
×
58
            .map_err(ApiError::SchemaNotFound)?;
×
59

60
        let oapi_generator = OpenApiGenerator::new(
×
61
            schema,
×
62
            secondary_indexes,
×
63
            schema_name,
×
64
            self.details.cache_endpoint.endpoint.clone(),
×
65
            vec![format!("http://localhost:{}", "8080")],
×
66
        );
×
67

×
68
        oapi_generator
×
69
            .generate_oas3()
×
70
            .map_err(ApiError::ApiGenerationError)
×
71
    }
×
72

73
    /// Get a single record by json string as primary key
74
    pub fn get_record(&self, key: &str) -> Result<IndexMap<String, Value>, CacheError> {
1✔
75
        let schema = self
1✔
76
            .reader
1✔
77
            .get_schema_and_indexes_by_name(&self.details.schema_name)?
1✔
78
            .0;
79

80
        let key = if schema.primary_index.is_empty() {
1✔
81
            json_str_to_field(key, dozer_types::types::FieldType::UInt, false)
×
82
                .map_err(CacheError::Type)
×
83
        } else if schema.primary_index.len() == 1 {
1✔
84
            let field = &schema.fields[schema.primary_index[0]];
1✔
85
            json_str_to_field(key, field.typ, field.nullable).map_err(CacheError::Type)
1✔
86
        } else {
87
            Err(CacheError::Query(
×
88
                dozer_cache::errors::QueryError::MultiIndexFetch(key.to_string()),
×
89
            ))
×
90
        }?;
×
91

92
        let key = index::get_primary_key(&[0], &[key]);
1✔
93
        let rec = self.reader.get(&key)?;
1✔
94

95
        record_to_map(&rec, &schema).map_err(CacheError::Type)
1✔
96
    }
1✔
97

98
    pub fn get_records_count(&self, mut exp: QueryExpression) -> Result<usize, CacheError> {
14✔
99
        self.reader.count(&self.details.schema_name, &mut exp)
14✔
100
    }
14✔
101

102
    /// Get multiple records
103
    pub fn get_records_map(
8✔
104
        &self,
8✔
105
        exp: QueryExpression,
8✔
106
    ) -> Result<Vec<IndexMap<String, Value>>, CacheError> {
8✔
107
        let mut maps = vec![];
8✔
108
        let (schema, records) = self.get_records(exp)?;
8✔
109
        for rec in records.iter() {
312✔
110
            let map = record_to_map(rec, &schema)?;
312✔
111
            maps.push(map);
312✔
112
        }
113
        Ok(maps)
8✔
114
    }
8✔
115
    /// Get multiple records
116
    pub fn get_records(
17✔
117
        &self,
17✔
118
        mut exp: QueryExpression,
17✔
119
    ) -> Result<(Schema, Vec<Record>), CacheError> {
17✔
120
        let schema = self
17✔
121
            .reader
17✔
122
            .get_schema_and_indexes_by_name(&self.details.schema_name)?
17✔
123
            .0;
124
        let records = self.reader.query(&self.details.schema_name, &mut exp)?;
17✔
125

126
        Ok((schema, records))
17✔
127
    }
17✔
128

129
    /// Get schema
130
    pub fn get_schema(&self) -> Result<Schema, CacheError> {
5✔
131
        let schema = self
5✔
132
            .reader
5✔
133
            .get_schema_and_indexes_by_name(&self.details.schema_name)?
5✔
134
            .0;
135
        Ok(schema)
5✔
136
    }
5✔
137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc