• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4382580286

pending completion
4382580286

push

github

GitHub
feat: Separate cache operation log environment and index environments (#1199)

1370 of 1370 new or added lines in 33 files covered. (100.0%)

28671 of 41023 relevant lines covered (69.89%)

51121.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.34
/dozer-api/src/rest/api_generator.rs
1
use std::sync::Arc;
2

3
use actix_web::web::ReqData;
4
use actix_web::{web, HttpResponse};
5
use dozer_cache::cache::expression::{default_limit_for_query, QueryExpression, Skip};
6
use dozer_cache::cache::{index, RecordWithId};
7
use dozer_cache::CacheReader;
8
use dozer_types::chrono::SecondsFormat;
9
use dozer_types::errors::types::TypeError;
10
use dozer_types::indexmap::IndexMap;
11
use dozer_types::log::info;
12
use dozer_types::models::api_endpoint::ApiEndpoint;
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::types::{Field, Schema, DATE_FORMAT};
15
use openapiv3::OpenAPI;
16

17
use crate::api_helper::{get_record, get_records, get_records_count};
18
use crate::generator::oapi::generator::OpenApiGenerator;
19
use crate::RoCacheEndpoint;
20
use crate::{auth::Access, errors::ApiError};
21
use dozer_types::grpc_types::health::health_check_response::ServingStatus;
22
use dozer_types::serde_json;
23
use dozer_types::serde_json::{json, Map, Value};
24

25
fn generate_oapi3(reader: &CacheReader, endpoint: ApiEndpoint) -> Result<OpenAPI, ApiError> {
×
26
    let (schema, secondary_indexes) = reader.get_schema();
×
27

×
28
    let oapi_generator = OpenApiGenerator::new(
×
29
        schema,
×
30
        secondary_indexes,
×
31
        endpoint,
×
32
        vec![format!("http://localhost:{}", "8080")],
×
33
    );
×
34

×
35
    Ok(oapi_generator.generate_oas3())
×
36
}
×
37

×
38
/// Generated function to return openapi.yaml documentation.
×
39
pub async fn generate_oapi(
×
40
    cache_endpoint: ReqData<Arc<RoCacheEndpoint>>,
×
41
) -> Result<HttpResponse, ApiError> {
×
42
    generate_oapi3(
×
43
        &cache_endpoint.cache_reader(),
×
44
        cache_endpoint.endpoint.clone(),
×
45
    )
×
46
    .map(|result| HttpResponse::Ok().json(result))
×
47
}
×
48

×
49
// Generated Get function to return a single record in JSON format
×
50
pub async fn get(
1✔
51
    access: Option<ReqData<Access>>,
1✔
52
    cache_endpoint: ReqData<Arc<RoCacheEndpoint>>,
1✔
53
    path: web::Path<String>,
1✔
54
) -> Result<HttpResponse, ApiError> {
1✔
55
    let cache_reader = &cache_endpoint.cache_reader();
1✔
56
    let schema = &cache_reader.get_schema().0;
1✔
57

1✔
58
    let key = path.as_str();
1✔
59
    let key = if schema.primary_index.is_empty() {
1✔
60
        return Err(ApiError::NoPrimaryKey);
×
61
    } else if schema.primary_index.len() == 1 {
1✔
62
        let field = &schema.fields[schema.primary_index[0]];
1✔
63
        Field::from_str(key, field.typ, field.nullable)?
1✔
64
    } else {
×
65
        return Err(ApiError::MultiIndexFetch(key.to_string()));
×
66
    };
×
67

×
68
    let key = index::get_primary_key(&[0], &[key]);
1✔
69
    let record = get_record(
1✔
70
        &cache_endpoint.cache_reader(),
1✔
71
        &key,
1✔
72
        access.map(|a| a.into_inner()),
1✔
73
    )?;
1✔
74

×
75
    Ok(record_to_map(record, schema).map(|map| HttpResponse::Ok().json(map))?)
1✔
76
}
1✔
77

×
78
// Generated list function for multiple records with a default query expression
×
79
pub async fn list(
3✔
80
    access: Option<ReqData<Access>>,
3✔
81
    cache_endpoint: ReqData<Arc<RoCacheEndpoint>>,
3✔
82
) -> Result<HttpResponse, ApiError> {
3✔
83
    let mut exp = QueryExpression::new(None, vec![], Some(50), Skip::Skip(0));
3✔
84
    match get_records_map(access, cache_endpoint, &mut exp) {
3✔
85
        Ok(maps) => Ok(HttpResponse::Ok().json(maps)),
3✔
86
        Err(e) => match e {
×
87
            ApiError::QueryFailed(_) => {
×
88
                let res: Vec<String> = vec![];
×
89
                info!("No records found.");
×
90
                Ok(HttpResponse::Ok().json(res))
×
91
            }
×
92
            _ => Err(ApiError::InternalError(Box::new(e))),
×
93
        },
×
94
    }
×
95
}
3✔
96

97
// Generated get function for health check
×
98
pub async fn health_route() -> Result<HttpResponse, ApiError> {
×
99
    let status = ServingStatus::Serving;
×
100
    let resp = json!({ "status": status.as_str_name() }).to_string();
×
101
    Ok(HttpResponse::Ok().body(resp))
×
102
}
×
103

×
104
pub async fn count(
5✔
105
    access: Option<ReqData<Access>>,
5✔
106
    cache_endpoint: ReqData<Arc<RoCacheEndpoint>>,
5✔
107
    query_info: Option<web::Json<Value>>,
5✔
108
) -> Result<HttpResponse, ApiError> {
5✔
109
    let mut query_expression = match query_info {
5✔
110
        Some(query_info) => serde_json::from_value::<QueryExpression>(query_info.0)
4✔
111
            .map_err(ApiError::map_deserialization_error)?,
4✔
112
        None => QueryExpression::with_no_limit(),
1✔
113
    };
×
114

×
115
    get_records_count(
5✔
116
        &cache_endpoint.cache_reader(),
5✔
117
        &mut query_expression,
5✔
118
        access.map(|a| a.into_inner()),
5✔
119
    )
5✔
120
    .map(|count| HttpResponse::Ok().json(count))
5✔
121
}
5✔
122

×
123
// Generated query function for multiple records
×
124
pub async fn query(
5✔
125
    access: Option<ReqData<Access>>,
5✔
126
    cache_endpoint: ReqData<Arc<RoCacheEndpoint>>,
5✔
127
    query_info: Option<web::Json<Value>>,
5✔
128
) -> Result<HttpResponse, ApiError> {
5✔
129
    let mut query_expression = match query_info {
5✔
130
        Some(query_info) => serde_json::from_value::<QueryExpression>(query_info.0)
4✔
131
            .map_err(ApiError::map_deserialization_error)?,
4✔
132
        None => QueryExpression::with_default_limit(),
1✔
133
    };
×
134
    if query_expression.limit.is_none() {
5✔
135
        query_expression.limit = Some(default_limit_for_query());
3✔
136
    }
3✔
137

×
138
    get_records_map(access, cache_endpoint, &mut query_expression)
5✔
139
        .map(|maps| HttpResponse::Ok().json(maps))
5✔
140
}
5✔
141

×
142
/// Get multiple records
×
143
fn get_records_map(
8✔
144
    access: Option<ReqData<Access>>,
8✔
145
    cache_endpoint: ReqData<Arc<RoCacheEndpoint>>,
8✔
146
    exp: &mut QueryExpression,
8✔
147
) -> Result<Vec<IndexMap<String, Value>>, ApiError> {
8✔
148
    let mut maps = vec![];
8✔
149
    let cache_reader = &cache_endpoint.cache_reader();
8✔
150
    let records = get_records(cache_reader, exp, access.map(|a| a.into_inner()))?;
8✔
151
    let schema = &cache_reader.get_schema().0;
8✔
152
    for record in records.into_iter() {
312✔
153
        let map = record_to_map(record, schema)?;
312✔
154
        maps.push(map);
312✔
155
    }
×
156
    Ok(maps)
8✔
157
}
8✔
158

×
159
/// Used in REST APIs for converting to JSON
×
160
fn record_to_map(
313✔
161
    record: RecordWithId,
313✔
162
    schema: &Schema,
313✔
163
) -> Result<IndexMap<String, Value>, TypeError> {
313✔
164
    let mut map = IndexMap::new();
313✔
165

166
    for (field_def, field) in schema.fields.iter().zip(record.record.values) {
1,565✔
167
        let val = field_to_json_value(field);
1,565✔
168
        map.insert(field_def.name.clone(), val);
1,565✔
169
    }
1,565✔
170

×
171
    map.insert("__dozer_record_id".to_string(), Value::from(record.id));
313✔
172
    map.insert(
313✔
173
        "__dozer_record_version".to_string(),
313✔
174
        Value::from(record.record.version),
313✔
175
    );
313✔
176

313✔
177
    Ok(map)
313✔
178
}
313✔
179

×
180
fn convert_x_y_to_object((x, y): &(OrderedFloat<f64>, OrderedFloat<f64>)) -> Value {
1✔
181
    let mut m = Map::new();
1✔
182
    m.insert("x".to_string(), Value::from(x.0));
1✔
183
    m.insert("y".to_string(), Value::from(y.0));
1✔
184
    Value::Object(m)
1✔
185
}
1✔
186

×
187
/// Used in REST APIs for converting raw value back and forth.
×
188
///
×
189
/// Should be consistent with `convert_cache_type_to_schema_type`.
×
190
fn field_to_json_value(field: Field) -> Value {
1,577✔
191
    match field {
1,577✔
192
        Field::UInt(n) => Value::from(n),
627✔
193
        Field::Int(n) => Value::from(n),
1✔
194
        Field::Float(n) => Value::from(n.0),
1✔
195
        Field::Boolean(b) => Value::from(b),
1✔
196
        Field::String(s) => Value::from(s),
314✔
197
        Field::Text(n) => Value::from(n),
1✔
198
        Field::Binary(b) => Value::from(b),
1✔
199
        Field::Decimal(n) => Value::String(n.to_string()),
1✔
200
        Field::Timestamp(ts) => Value::String(ts.to_rfc3339_opts(SecondsFormat::Millis, true)),
1✔
201
        Field::Date(n) => Value::String(n.format(DATE_FORMAT).to_string()),
1✔
202
        Field::Bson(b) => Value::from(b),
1✔
203
        Field::Point(point) => convert_x_y_to_object(&point.0.x_y()),
1✔
204
        Field::Null => Value::Null,
626✔
205
    }
×
206
}
1,577✔
207

×
208
#[cfg(test)]
×
209
mod tests {
×
210
    use dozer_types::{
×
211
        chrono::{NaiveDate, Offset, TimeZone, Utc},
×
212
        json_value_to_field,
×
213
        ordered_float::OrderedFloat,
×
214
        rust_decimal::Decimal,
×
215
        types::{DozerPoint, Field, FieldType},
216
    };
×
217

218
    use super::*;
219

220
    fn test_field_conversion(field_type: FieldType, field: Field) {
12✔
221
        // Convert the field to a JSON value.
12✔
222
        let value = field_to_json_value(field.clone());
12✔
223

12✔
224
        // Convert the JSON value back to a Field.
12✔
225
        let deserialized = json_value_to_field(value, field_type, true).unwrap();
12✔
226

12✔
227
        assert_eq!(deserialized, field, "must be equal");
12✔
228
    }
12✔
229

×
230
    #[test]
1✔
231
    fn test_field_types_json_conversion() {
1✔
232
        let fields = vec![
1✔
233
            (FieldType::Int, Field::Int(-1)),
1✔
234
            (FieldType::UInt, Field::UInt(1)),
1✔
235
            (FieldType::Float, Field::Float(OrderedFloat(1.1))),
1✔
236
            (FieldType::Boolean, Field::Boolean(true)),
1✔
237
            (FieldType::String, Field::String("a".to_string())),
1✔
238
            (FieldType::Binary, Field::Binary(b"asdf".to_vec())),
1✔
239
            (FieldType::Decimal, Field::Decimal(Decimal::new(202, 2))),
1✔
240
            (
1✔
241
                FieldType::Timestamp,
1✔
242
                Field::Timestamp(Utc.fix().with_ymd_and_hms(2001, 1, 1, 0, 4, 0).unwrap()),
1✔
243
            ),
1✔
244
            (
1✔
245
                FieldType::Date,
1✔
246
                Field::Date(NaiveDate::from_ymd_opt(2022, 11, 24).unwrap()),
1✔
247
            ),
1✔
248
            (
1✔
249
                FieldType::Bson,
1✔
250
                Field::Bson(vec![
1✔
251
                    // BSON representation of `{"abc":"foo"}`
1✔
252
                    123, 34, 97, 98, 99, 34, 58, 34, 102, 111, 111, 34, 125,
1✔
253
                ]),
1✔
254
            ),
1✔
255
            (FieldType::Text, Field::Text("lorem ipsum".to_string())),
1✔
256
            (
1✔
257
                FieldType::Point,
1✔
258
                Field::Point(DozerPoint::from((3.234, 4.567))),
1✔
259
            ),
1✔
260
        ];
1✔
261
        for (field_type, field) in fields {
13✔
262
            test_field_conversion(field_type, field);
12✔
263
        }
12✔
264
    }
1✔
265
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc