• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.02
/dozer-api/src/lib.rs
1
use arc_swap::ArcSwap;
2
use cache_builder::open_or_create_cache;
3
use dozer_cache::{
4
    cache::{CacheWriteOptions, RwCacheManager},
5
    dozer_log::reader::{LogReaderBuilder, LogReaderOptions},
6
    errors::CacheError,
7
    CacheReader,
8
};
9
use dozer_types::{
10
    grpc_types::types::Operation,
11
    labels::Labels,
12
    models::api_endpoint::{
13
        default_log_reader_batch_size, default_log_reader_buffer_size,
14
        default_log_reader_timeout_in_millis, ApiEndpoint,
15
    },
16
};
17
use futures_util::Future;
18
use std::{ops::Deref, sync::Arc};
19

20
pub use tonic_reflection;
21
pub use tonic_web;
22
pub use tower_http;
23
mod api_helper;
24

×
25
#[derive(Debug)]
×
26
pub struct CacheEndpoint {
27
    cache_reader: ArcSwap<CacheReader>,
28
    descriptor: Vec<u8>,
29
    endpoint: ApiEndpoint,
30
}
31

32
const ENDPOINT_LABEL: &str = "endpoint";
33
const BUILD_LABEL: &str = "build";
34

35
impl CacheEndpoint {
×
36
    pub async fn new(
6✔
37
        app_server_addr: String,
6✔
38
        cache_manager: &dyn RwCacheManager,
6✔
39
        endpoint: ApiEndpoint,
6✔
40
        cancel: impl Future<Output = ()> + Unpin + Send + 'static,
6✔
41
        operations_sender: Option<Sender<Operation>>,
6✔
42
        multi_pb: Option<MultiProgress>,
6✔
43
    ) -> Result<(Self, JoinHandle<Result<(), CacheError>>), ApiInitError> {
6✔
44
        // Create log reader builder.
×
45
        let log_reader_builder =
6✔
46
            LogReaderBuilder::new(app_server_addr, get_log_reader_options(&endpoint)).await?;
54✔
47
        let descriptor = log_reader_builder.descriptor.clone();
6✔
48

6✔
49
        // Open or create cache.
6✔
50
        let cache_labels =
6✔
51
            cache_labels(endpoint.name.clone(), log_reader_builder.build_name.clone());
6✔
52
        let schema = log_reader_builder.schema.clone();
6✔
53
        let conflict_resolution = endpoint.conflict_resolution.unwrap_or_default();
6✔
54
        let write_options = CacheWriteOptions {
6✔
55
            insert_resolution: conflict_resolution.on_insert.unwrap_or_default(),
6✔
56
            delete_resolution: conflict_resolution.on_delete.unwrap_or_default(),
6✔
57
            update_resolution: conflict_resolution.on_update.unwrap_or_default(),
6✔
58
            ..Default::default()
6✔
59
        };
6✔
60
        let cache = open_or_create_cache(
6✔
61
            cache_manager,
6✔
62
            cache_labels.clone(),
6✔
63
            (schema.schema, schema.secondary_indexes),
6✔
64
            &schema.connections,
6✔
65
            write_options,
6✔
66
        )
6✔
67
        .map_err(ApiInitError::OpenOrCreateCache)?;
6✔
68

69
        // Open cache reader.
×
70
        let cache_reader =
6✔
71
            open_cache_reader(cache_manager, cache_labels)?.expect("We just created the cache");
6✔
72

6✔
73
        // Start cache builder.
6✔
74
        let handle = {
6✔
75
            let operations_sender = operations_sender.map(|sender| (endpoint.name.clone(), sender));
6✔
76
            tokio::spawn(async move {
6✔
77
                cache_builder::build_cache(
6✔
78
                    cache,
6✔
79
                    cancel,
6✔
80
                    log_reader_builder,
6✔
81
                    operations_sender,
6✔
82
                    multi_pb,
6✔
83
                )
6✔
84
                .await
16✔
85
            })
6✔
86
        };
6✔
87

6✔
88
        Ok((
6✔
89
            Self {
6✔
90
                cache_reader: ArcSwap::from_pointee(cache_reader),
6✔
91
                descriptor,
6✔
92
                endpoint,
6✔
93
            },
6✔
94
            handle,
6✔
95
        ))
6✔
96
    }
6✔
97

×
98
    pub fn open(
23✔
99
        cache_manager: &dyn RwCacheManager,
23✔
100
        descriptor: Vec<u8>,
23✔
101
        endpoint: ApiEndpoint,
23✔
102
    ) -> Result<Self, ApiInitError> {
23✔
103
        let mut labels = Labels::new();
23✔
104
        labels.push(endpoint.name.clone(), endpoint.name.clone());
23✔
105
        Ok(Self {
23✔
106
            cache_reader: ArcSwap::from_pointee(open_existing_cache_reader(cache_manager, labels)?),
23✔
107
            descriptor,
23✔
108
            endpoint,
23✔
109
        })
×
110
    }
23✔
111

×
112
    pub fn cache_reader(&self) -> impl Deref<Target = Arc<CacheReader>> + '_ {
46✔
113
        self.cache_reader.load()
46✔
114
    }
46✔
115

×
116
    pub fn descriptor(&self) -> &[u8] {
25✔
117
        &self.descriptor
25✔
118
    }
25✔
119

×
120
    pub fn endpoint(&self) -> &ApiEndpoint {
×
121
        &self.endpoint
×
122
    }
×
123
}
124

×
125
pub fn cache_labels(endpoint: String, build: String) -> Labels {
9✔
126
    let mut labels = Labels::new();
9✔
127
    labels.push(ENDPOINT_LABEL, endpoint);
9✔
128
    labels.push(BUILD_LABEL, build);
9✔
129
    labels
9✔
130
}
9✔
131

×
132
fn open_cache_reader(
32✔
133
    cache_manager: &dyn RwCacheManager,
32✔
134
    labels: Labels,
32✔
135
) -> Result<Option<CacheReader>, ApiInitError> {
32✔
136
    let cache = cache_manager
32✔
137
        .open_ro_cache(labels)
32✔
138
        .map_err(ApiInitError::OpenOrCreateCache)?;
32✔
139
    Ok(cache.map(CacheReader::new))
32✔
140
}
32✔
141

142
fn open_existing_cache_reader(
143
    cache_manager: &dyn RwCacheManager,
144
    labels: Labels,
145
) -> Result<CacheReader, ApiInitError> {
×
146
    open_cache_reader(cache_manager, labels.clone())?
23✔
147
        .ok_or_else(|| ApiInitError::CacheNotFound(labels))
23✔
148
}
23✔
149

×
150
fn get_log_reader_options(endpoint: &ApiEndpoint) -> LogReaderOptions {
9✔
151
    LogReaderOptions {
9✔
152
        endpoint: endpoint.name.clone(),
9✔
153
        batch_size: endpoint
9✔
154
            .log_reader_options
9✔
155
            .as_ref()
9✔
156
            .and_then(|options| options.batch_size)
9✔
157
            .unwrap_or_else(default_log_reader_batch_size),
9✔
158
        timeout_in_millis: endpoint
9✔
159
            .log_reader_options
9✔
160
            .as_ref()
9✔
161
            .and_then(|options| options.timeout_in_millis)
9✔
162
            .unwrap_or_else(default_log_reader_timeout_in_millis),
9✔
163
        buffer_size: endpoint
9✔
164
            .log_reader_options
9✔
165
            .as_ref()
9✔
166
            .and_then(|options| options.buffer_size)
9✔
167
            .unwrap_or_else(default_log_reader_buffer_size),
9✔
168
    }
9✔
169
}
9✔
170

171
// Exports
172
pub mod auth;
173
mod cache_builder;
174
pub mod errors;
175
pub mod generator;
176
pub mod grpc;
177
pub mod rest;
178
// Re-exports
179
pub use actix_cors;
180
pub use actix_web;
181
pub use actix_web_httpauth;
182
pub use api_helper::API_LATENCY_HISTOGRAM_NAME;
183
pub use api_helper::API_REQUEST_COUNTER_NAME;
184
pub use async_trait;
185
use dozer_types::indicatif::MultiProgress;
186
use errors::ApiInitError;
187
pub use openapiv3;
188
pub use tokio;
189
use tokio::{sync::broadcast::Sender, task::JoinHandle};
190
pub use tonic;
191
pub use tracing_actix_web;
192
#[cfg(test)]
193
mod test_utils;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc