• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.17
/dozer-cli/src/simple/build.rs
1
use std::{borrow::Cow, collections::BTreeMap, fs::OpenOptions, path::Path};
2

3
use dozer_api::generator::protoc::generator::ProtoGenerator;
4
use dozer_cache::dozer_log::{
5
    home_dir::{BuildId, BuildPath, HomeDir},
6
    replication::create_log_storage,
7
    schemas::EndpointSchema,
8
    storage::Storage,
9
};
10
use dozer_core::{
11
    dag_schemas::{DagSchemas, EdgeType},
12
    daggy,
13
    petgraph::{
14
        visit::{IntoEdgesDirected, IntoNodeReferences},
15
        Direction,
16
    },
17
};
18
use dozer_types::{
19
    log::info,
20
    models::{
21
        api_endpoint::{
22
            ApiEndpoint, FullText, SecondaryIndex, SecondaryIndexConfig, SortedInverted,
23
        },
24
        app_config::LogStorage,
25
    },
26
    node::NodeHandle,
27
    types::{FieldDefinition, FieldType, IndexDefinition, Schema, SchemaWithIndex},
28
};
29
use futures::future::try_join_all;
30
use serde::{de::DeserializeOwned, Serialize};
31

32
use crate::errors::BuildError;
33

34
#[derive(Debug)]
×
35
pub struct Contract {
36
    pub pipeline: daggy::Dag<NodeHandle, EdgeType>,
37
    pub endpoints: BTreeMap<String, EndpointSchema>,
38
}
39

40
impl Contract {
41
    pub fn new<T>(
6✔
42
        dag_schemas: DagSchemas<T>,
6✔
43
        endpoints: &[ApiEndpoint],
6✔
44
        enable_token: bool,
6✔
45
        enable_on_event: bool,
6✔
46
    ) -> Result<Self, BuildError> {
6✔
47
        let sink_schemas = dag_schemas.get_sink_schemas();
6✔
48
        let mut endpoint_schemas = BTreeMap::new();
6✔
49
        for (endpoint_name, (schema, connections)) in sink_schemas {
12✔
50
            let endpoint = endpoints
6✔
51
                .iter()
6✔
52
                .find(|e| e.name == *endpoint_name)
6✔
53
                .ok_or(BuildError::MissingEndpoint(endpoint_name.clone()))?;
6✔
54
            let (schema, secondary_indexes) = modify_schema(&schema, endpoint)?;
6✔
55
            let schema = EndpointSchema {
6✔
56
                schema,
6✔
57
                secondary_indexes,
6✔
58
                enable_token,
6✔
59
                enable_on_event,
6✔
60
                connections,
6✔
61
            };
6✔
62
            endpoint_schemas.insert(endpoint_name, schema);
6✔
63
        }
64

65
        let pipeline = dag_schemas
6✔
66
            .into_graph()
6✔
67
            .map_owned(|_, node| node.handle, |_, edge| edge);
22✔
68

6✔
69
        Ok(Self {
6✔
70
            pipeline,
6✔
71
            endpoints: endpoint_schemas,
6✔
72
        })
6✔
73
    }
6✔
74

75
    pub fn deserialize(build_path: &BuildPath) -> Result<Self, BuildError> {
×
76
        let pipeline: daggy::Dag<NodeHandle, EdgeType> =
×
77
            serde_json_from_path(&build_path.dag_path)?;
×
78

79
        let mut endpoints = BTreeMap::new();
×
80
        for (node_index, node) in pipeline.node_references() {
×
81
            // Endpoint must have zero out degree.
82
            if pipeline
×
83
                .edges_directed(node_index, Direction::Outgoing)
×
84
                .count()
×
85
                > 0
×
86
            {
87
                continue;
×
88
            }
×
89

×
90
            // `NodeHandle::id` is the endpoint name.
×
91
            let endpoint_name = node.id.clone();
×
92
            let endpoint_path = build_path.get_endpoint_path(&endpoint_name);
×
93
            let schema: EndpointSchema = serde_json_from_path(&endpoint_path.schema_path)?;
×
94
            endpoints.insert(endpoint_name, schema);
×
95
        }
96

97
        Ok(Self {
×
98
            pipeline,
×
99
            endpoints,
×
100
        })
×
101
    }
×
102
}
103

104
pub async fn build(
6✔
105
    home_dir: &HomeDir,
6✔
106
    contract: &Contract,
6✔
107
    storage_config: &LogStorage,
6✔
108
) -> Result<(), BuildError> {
6✔
109
    if let Some(build_id) = needs_build(home_dir, contract, storage_config).await? {
6✔
110
        let build_name = build_id.name().to_string();
6✔
111
        create_build(home_dir, build_id, contract)?;
6✔
112
        info!("Created new build {build_name}");
6✔
113
    } else {
114
        info!("Building not needed");
×
115
    }
116
    Ok(())
6✔
117
}
6✔
118

119
async fn needs_build(
6✔
120
    home_dir: &HomeDir,
6✔
121
    contract: &Contract,
6✔
122
    storage_config: &LogStorage,
6✔
123
) -> Result<Option<BuildId>, BuildError> {
6✔
124
    let build_path = home_dir
6✔
125
        .find_latest_build_path()
6✔
126
        .map_err(|(path, error)| BuildError::FileSystem(path.into(), error))?;
6✔
127
    let Some(build_path) = build_path else {
6✔
128
        return Ok(Some(BuildId::first()));
6✔
129
    };
130

131
    let mut futures = vec![];
×
132
    for endpoint in contract.endpoints.keys() {
×
133
        let endpoint_path = build_path.get_endpoint_path(endpoint);
×
134
        let (storage, prefix) =
×
135
            create_log_storage(storage_config.clone(), endpoint_path.log_dir.into()).await?;
×
136
        futures.push(is_empty(storage, prefix));
×
137
    }
×
138
    if !try_join_all(futures)
×
139
        .await?
×
140
        .into_iter()
×
141
        .all(|is_empty| is_empty)
×
142
    {
×
143
        return Ok(Some(build_path.id.next()));
×
144
    }
×
145

×
146
    let existing_contract = Contract::deserialize(&build_path)?;
×
147
    for (endpoint, schema) in &contract.endpoints {
×
148
        if let Some(existing_schema) = existing_contract.endpoints.get(endpoint) {
×
149
            if schema == existing_schema {
×
150
                continue;
×
151
            }
×
152
        } else {
×
153
            return Ok(Some(build_path.id.next()));
×
154
        }
155
    }
×
156
    Ok(None)
×
157
}
6✔
158

×
159
async fn is_empty(storage: Box<dyn Storage>, prefix: String) -> Result<bool, BuildError> {
×
160
    let objects = storage.list_objects(prefix, None).await?;
×
161
    Ok(objects.objects.is_empty())
×
162
}
×
163

×
164
fn create_build(
6✔
165
    home_dir: &HomeDir,
6✔
166
    build_id: BuildId,
6✔
167
    contract: &Contract,
6✔
168
) -> Result<(), BuildError> {
6✔
169
    let build_path = home_dir
6✔
170
        .create_build_dir_all(build_id)
6✔
171
        .map_err(|(path, error)| BuildError::FileSystem(path.into(), error))?;
6✔
172

×
173
    contract.serialize(&build_path)?;
6✔
174

175
    let mut resources = Vec::new();
6✔
176

6✔
177
    let proto_folder_path = build_path.contracts_dir.as_ref();
6✔
178
    for (endpoint_name, schema) in &contract.endpoints {
12✔
179
        ProtoGenerator::generate(proto_folder_path, endpoint_name, schema)?;
6✔
180
        resources.push(endpoint_name.clone());
6✔
181
    }
×
182

×
183
    let common_resources = ProtoGenerator::copy_common(proto_folder_path)?;
6✔
184

185
    // Copy common service to be included in descriptor.
×
186
    resources.extend(common_resources);
6✔
187

6✔
188
    // Generate a descriptor based on all proto files generated within sink.
6✔
189
    ProtoGenerator::generate_descriptor(
6✔
190
        proto_folder_path,
6✔
191
        build_path.descriptor_path.as_ref(),
6✔
192
        &resources,
6✔
193
    )?;
6✔
194

×
195
    Ok(())
6✔
196
}
6✔
197

×
198
fn modify_schema(
6✔
199
    schema: &Schema,
6✔
200
    api_endpoint: &ApiEndpoint,
6✔
201
) -> Result<SchemaWithIndex, BuildError> {
6✔
202
    let mut schema = schema.clone();
6✔
203
    // Generated Cache index based on api_index
×
204
    let configured_index = create_primary_indexes(
6✔
205
        &schema.fields,
6✔
206
        api_endpoint
6✔
207
            .index
6✔
208
            .as_ref()
6✔
209
            .map(|index| index.primary_key.as_slice()),
6✔
210
    )?;
6✔
211
    // Generated schema in SQL
×
212
    let upstream_index = schema.primary_index.clone();
6✔
213

214
    let index = match (configured_index.is_empty(), upstream_index.is_empty()) {
6✔
215
        (true, true) => vec![],
6✔
216
        (true, false) => upstream_index,
×
217
        (false, true) => configured_index,
×
218
        (false, false) => {
×
219
            if !upstream_index.eq(&configured_index) {
×
220
                return Err(BuildError::MismatchPrimaryKey {
×
221
                    endpoint_name: api_endpoint.name.clone(),
×
222
                    expected: get_field_names(&schema, &upstream_index),
×
223
                    actual: get_field_names(&schema, &configured_index),
×
224
                });
×
225
            }
×
226
            configured_index
×
227
        }
×
228
    };
×
229

230
    schema.primary_index = index;
6✔
231

6✔
232
    let secondary_index_config = get_secondary_index_config(api_endpoint);
6✔
233
    let secondary_indexes = generate_secondary_indexes(&schema.fields, &secondary_index_config)?;
6✔
234

×
235
    Ok((schema, secondary_indexes))
6✔
236
}
6✔
237

×
238
fn get_field_names(schema: &Schema, indexes: &[usize]) -> Vec<String> {
×
239
    indexes
×
240
        .iter()
×
241
        .map(|idx| schema.fields[*idx].name.to_owned())
×
242
        .collect()
×
243
}
×
244

×
245
fn get_secondary_index_config(api_endpoint: &ApiEndpoint) -> Cow<SecondaryIndexConfig> {
×
246
    if let Some(config) = api_endpoint
6✔
247
        .index
6✔
248
        .as_ref()
6✔
249
        .and_then(|index| index.secondary.as_ref())
6✔
250
    {
×
251
        Cow::Borrowed(config)
×
252
    } else {
253
        Cow::Owned(SecondaryIndexConfig::default())
6✔
254
    }
255
}
6✔
256

257
fn create_primary_indexes(
6✔
258
    field_definitions: &[FieldDefinition],
6✔
259
    primary_key: Option<&[String]>,
6✔
260
) -> Result<Vec<usize>, BuildError> {
6✔
261
    let mut primary_index = Vec::new();
6✔
262
    if let Some(primary_key) = primary_key {
6✔
263
        for name in primary_key {
×
264
            primary_index.push(field_index_from_field_name(field_definitions, name)?);
×
265
        }
×
266
    }
6✔
267
    Ok(primary_index)
6✔
268
}
6✔
269

×
270
fn generate_secondary_indexes(
6✔
271
    field_definitions: &[FieldDefinition],
6✔
272
    config: &SecondaryIndexConfig,
6✔
273
) -> Result<Vec<IndexDefinition>, BuildError> {
6✔
274
    let mut result = vec![];
6✔
275

×
276
    // Create default indexes unless skipped.
×
277
    for (index, field) in field_definitions.iter().enumerate() {
14✔
278
        if config.skip_default.contains(&field.name) {
14✔
279
            continue;
×
280
        }
14✔
281

14✔
282
        match field.typ {
14✔
283
            // Create sorted inverted indexes for these fields
×
284
            FieldType::UInt
×
285
            | FieldType::U128
286
            | FieldType::Int
287
            | FieldType::I128
288
            | FieldType::Float
289
            | FieldType::Boolean
290
            | FieldType::Decimal
291
            | FieldType::Timestamp
292
            | FieldType::Date
293
            | FieldType::Point
294
            | FieldType::Duration => result.push(IndexDefinition::SortedInverted(vec![index])),
8✔
295

296
            // Create sorted inverted and full text indexes for string fields.
×
297
            FieldType::String => {
6✔
298
                result.push(IndexDefinition::SortedInverted(vec![index]));
6✔
299
                result.push(IndexDefinition::FullText(index));
6✔
300
            }
6✔
301

×
302
            // Skip creating indexes
×
303
            FieldType::Text | FieldType::Binary | FieldType::Json => (),
×
304
        }
305
    }
×
306

307
    // Create requested indexes.
308
    for create in &config.create {
6✔
309
        if let Some(index) = &create.index {
×
310
            match index {
×
311
                SecondaryIndex::SortedInverted(SortedInverted { fields }) => {
×
312
                    let fields = fields
×
313
                        .iter()
×
314
                        .map(|field| field_index_from_field_name(field_definitions, field))
×
315
                        .collect::<Result<Vec<_>, _>>()?;
×
316
                    result.push(IndexDefinition::SortedInverted(fields));
×
317
                }
×
318
                SecondaryIndex::FullText(FullText { field }) => {
×
319
                    let field = field_index_from_field_name(field_definitions, field)?;
×
320
                    result.push(IndexDefinition::FullText(field));
×
321
                }
×
322
            }
×
323
        }
×
324
    }
325

×
326
    Ok(result)
6✔
327
}
6✔
328

×
329
fn field_index_from_field_name(
×
330
    fields: &[FieldDefinition],
×
331
    field_name: &str,
×
332
) -> Result<usize, BuildError> {
×
333
    fields
×
334
        .iter()
×
335
        .position(|field| field.name == field_name)
×
336
        .ok_or(BuildError::FieldNotFound(field_name.to_string()))
×
337
}
×
338

×
339
impl Contract {
×
340
    fn serialize(&self, build_path: &BuildPath) -> Result<(), BuildError> {
341
        serde_json_to_path(&build_path.dag_path, &self.pipeline)?;
6✔
342

343
        for (endpoint_name, schema) in &self.endpoints {
12✔
344
            let endpoint_path = build_path.get_endpoint_path(endpoint_name);
6✔
345
            serde_json_to_path(&endpoint_path.schema_path, schema)?;
6✔
346
        }
×
347

×
348
        Ok(())
6✔
349
    }
6✔
350
}
×
351

×
352
fn serde_json_to_path(path: impl AsRef<Path>, value: &impl Serialize) -> Result<(), BuildError> {
12✔
353
    let file = OpenOptions::new()
12✔
354
        .create(true)
12✔
355
        .write(true)
12✔
356
        .open(path.as_ref())
12✔
357
        .map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
12✔
358
    serde_json::to_writer_pretty(file, value)?;
12✔
359
    Ok(())
12✔
360
}
12✔
361

×
362
fn serde_json_from_path<T>(path: impl AsRef<Path>) -> Result<T, BuildError>
×
363
where
×
364
    T: DeserializeOwned,
×
365
{
×
366
    let file = OpenOptions::new()
×
367
        .read(true)
×
368
        .open(path.as_ref())
×
369
        .map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
×
370
    Ok(serde_json::from_reader(file)?)
×
371
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc