• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763381855

pending completion
4763381855

Pull #1461

github

GitHub
Merge 50bf72be2 into c58df4a0b
Pull Request #1461: feat: Make secondary index configurable

135 of 135 new or added lines in 6 files covered. (100.0%)

34877 of 44466 relevant lines covered (78.44%)

11367.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/dozer-api/src/lib.rs
1
use arc_swap::ArcSwap;
2✔
2
use dozer_cache::{
3
    cache::{CacheWriteOptions, RwCacheManager},
4
    errors::CacheError,
5
    CacheReader,
6
};
7
use dozer_types::{
8
    grpc_types::types::Operation,
9
    log::info,
10
    models::api_endpoint::{
11
        ApiEndpoint, OnDeleteResolutionTypes, OnInsertResolutionTypes, OnUpdateResolutionTypes,
12
        SecondaryIndexConfig,
13
    },
14
    types::Schema,
15
};
16
use futures_util::Future;
17
use std::{
18
    borrow::{Borrow, Cow},
19
    ops::Deref,
20
    path::Path,
21
    sync::Arc,
22
};
23

24
mod api_helper;
25

26
#[derive(Debug)]
×
27
pub struct CacheEndpoint {
28
    cache_reader: ArcSwap<CacheReader>,
29
    endpoint: ApiEndpoint,
30
}
31

32
impl CacheEndpoint {
33
    #[allow(clippy::too_many_arguments)]
34
    pub async fn new(
9✔
35
        cache_manager: &dyn RwCacheManager,
9✔
36
        schema: Schema,
9✔
37
        endpoint: ApiEndpoint,
9✔
38
        runtime: Arc<Runtime>,
9✔
39
        cancel: impl Future<Output = ()> + Unpin,
9✔
40
        log_path: &Path,
9✔
41
        operations_sender: Option<Sender<Operation>>,
9✔
42
        multi_pb: Option<MultiProgress>,
9✔
43
    ) -> Result<(Self, Option<impl FnOnce() -> Result<(), CacheError>>), ApiError> {
9✔
44
        let (cache_reader, task) = if let Some(cache_reader) =
9✔
45
            open_cache_reader(cache_manager, &endpoint.name)?
9✔
46
        {
47
            (cache_reader, None)
×
48
        } else {
49
            let operations_sender = operations_sender.map(|sender| (endpoint.name.clone(), sender));
9✔
50
            let conflict_resolution = endpoint.conflict_resolution.unwrap_or_default();
9✔
51
            let write_options = CacheWriteOptions {
9✔
52
                insert_resolution: OnInsertResolutionTypes::from(conflict_resolution.on_insert),
9✔
53
                delete_resolution: OnDeleteResolutionTypes::from(conflict_resolution.on_delete),
9✔
54
                update_resolution: OnUpdateResolutionTypes::from(conflict_resolution.on_update),
9✔
55
                ..Default::default()
9✔
56
            };
9✔
57
            let (cache_name, task) = cache_builder::create_cache(
9✔
58
                cache_manager,
9✔
59
                schema,
9✔
60
                get_secondary_index_config(&endpoint).borrow(),
9✔
61
                runtime,
9✔
62
                cancel,
9✔
63
                log_path,
9✔
64
                write_options,
9✔
65
                operations_sender,
9✔
66
                multi_pb,
9✔
67
            )
9✔
68
            .await
18✔
69
            .map_err(ApiError::CreateCache)?;
9✔
70
            // TODO: We intentionally don't create alias endpoint.name -> cache_name here.
71
            (
72
                open_cache_reader(cache_manager, &cache_name)?.expect("We just created the cache"),
9✔
73
                Some(task),
9✔
74
            )
75
        };
76
        Ok((
9✔
77
            Self {
9✔
78
                cache_reader: ArcSwap::from_pointee(cache_reader),
9✔
79
                endpoint,
9✔
80
            },
9✔
81
            task,
9✔
82
        ))
9✔
83
    }
9✔
84

85
    pub fn open(
19✔
86
        cache_manager: &dyn RwCacheManager,
19✔
87
        endpoint: ApiEndpoint,
19✔
88
    ) -> Result<Self, ApiError> {
19✔
89
        Ok(Self {
19✔
90
            cache_reader: ArcSwap::from_pointee(open_existing_cache_reader(
19✔
91
                cache_manager,
19✔
92
                &endpoint.name,
19✔
93
            )?),
19✔
94
            endpoint,
19✔
95
        })
96
    }
19✔
97

98
    pub fn cache_reader(&self) -> impl Deref<Target = Arc<CacheReader>> + '_ {
46✔
99
        self.cache_reader.load()
46✔
100
    }
46✔
101

102
    pub fn endpoint(&self) -> &ApiEndpoint {
×
103
        &self.endpoint
×
104
    }
×
105

106
    pub fn redirect_cache(&self, cache_manager: &dyn RwCacheManager) -> Result<(), ApiError> {
107
        self.cache_reader.store(Arc::new(open_existing_cache_reader(
×
108
            cache_manager,
×
109
            &self.endpoint.name,
×
110
        )?));
×
111
        Ok(())
×
112
    }
×
113
}
114

115
fn open_cache_reader(
43✔
116
    cache_manager: &dyn RwCacheManager,
43✔
117
    name: &str,
43✔
118
) -> Result<Option<CacheReader>, ApiError> {
43✔
119
    let cache = cache_manager
43✔
120
        .open_ro_cache(name)
43✔
121
        .map_err(ApiError::OpenCache)?;
43✔
122
    Ok(cache.map(|cache| {
43✔
123
        info!("[api] Serving {} using cache {}", name, cache.name());
31✔
124
        CacheReader::new(cache)
31✔
125
    }))
43✔
126
}
43✔
127

128
fn open_existing_cache_reader(
129
    cache_manager: &dyn RwCacheManager,
130
    name: &str,
131
) -> Result<CacheReader, ApiError> {
132
    open_cache_reader(cache_manager, name)?.ok_or_else(|| ApiError::CacheNotFound(name.to_string()))
19✔
133
}
19✔
134

135
fn get_secondary_index_config(api_endpoint: &ApiEndpoint) -> Cow<SecondaryIndexConfig> {
136
    if let Some(config) = api_endpoint
12✔
137
        .index
12✔
138
        .as_ref()
12✔
139
        .and_then(|index| index.secondary.as_ref())
12✔
140
    {
141
        Cow::Borrowed(config)
×
142
    } else {
143
        Cow::Owned(SecondaryIndexConfig::default())
12✔
144
    }
145
}
12✔
146

147
// Exports
148
pub mod auth;
149
mod cache_builder;
150
pub mod errors;
151
pub mod generator;
152
pub mod grpc;
153
pub mod rest;
154
// Re-exports
155
pub use actix_cors;
156
pub use actix_web;
157
pub use async_trait;
158
use dozer_types::indicatif::MultiProgress;
159
use errors::ApiError;
160
pub use openapiv3;
161
pub use tokio;
162
use tokio::{runtime::Runtime, sync::broadcast::Sender};
163
pub use tonic;
164
pub use tracing_actix_web;
165

166
#[cfg(test)]
167
mod test_utils;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc