• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5831539996

pending completion
5831539996

Pull #1824

github

chubei
feat: Publish DAG to JSON
Pull Request #1824: feat: Publish Source contracts as JSON file

249 of 249 new or added lines in 14 files covered. (100.0%)

45567 of 61724 relevant lines covered (73.82%)

56287.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.57
/dozer-cli/src/simple/build.rs
1
use std::{borrow::Cow, collections::BTreeMap, fs::OpenOptions, path::Path};
2

3
use dozer_api::generator::protoc::generator::ProtoGenerator;
4
use dozer_cache::dozer_log::{
5
    home_dir::{BuildId, BuildPath, HomeDir},
6
    replication::create_log_storage,
7
    schemas::EndpointSchema,
8
    storage::Storage,
9
};
10
use dozer_core::{
11
    dag_schemas::{DagSchemas, EdgeType},
12
    daggy,
13
    petgraph::{
14
        visit::{IntoEdgesDirected, IntoNodeReferences},
15
        Direction,
16
    },
17
};
18
use dozer_types::{
19
    log::info,
20
    models::{
21
        api_endpoint::{
22
            ApiEndpoint, FullText, SecondaryIndex, SecondaryIndexConfig, SortedInverted,
×
23
        },
×
24
        app_config::LogStorage,
×
25
    },
×
26
    node::NodeHandle,
×
27
    types::{FieldDefinition, FieldType, IndexDefinition, Schema, SchemaWithIndex},
×
28
};
×
29
use futures::future::try_join_all;
×
30
use serde::{de::DeserializeOwned, Serialize};
×
31

×
32
use crate::errors::BuildError;
33

×
34
#[derive(Debug)]
×
35
pub struct Contract {
×
36
    pub pipeline: daggy::Dag<NodeHandle, EdgeType>,
×
37
    pub endpoints: BTreeMap<String, EndpointSchema>,
38
}
×
39

×
40
impl Contract {
×
41
    pub fn new<T>(
6✔
42
        dag_schemas: DagSchemas<T>,
6✔
43
        endpoints: &[ApiEndpoint],
6✔
44
        enable_token: bool,
6✔
45
        enable_on_event: bool,
6✔
46
    ) -> Result<Self, BuildError> {
6✔
47
        let sink_schemas = dag_schemas.get_sink_schemas();
6✔
48
        let mut endpoint_schemas = BTreeMap::new();
6✔
49
        for (endpoint_name, (schema, connections)) in sink_schemas {
12✔
50
            let endpoint = endpoints
6✔
51
                .iter()
6✔
52
                .find(|e| e.name == *endpoint_name)
6✔
53
                .ok_or(BuildError::MissingEndpoint(endpoint_name.clone()))?;
6✔
54
            let (schema, secondary_indexes) = modify_schema(&schema, endpoint)?;
6✔
55
            let schema = EndpointSchema {
6✔
56
                schema,
6✔
57
                secondary_indexes,
6✔
58
                enable_token,
6✔
59
                enable_on_event,
6✔
60
                connections,
6✔
61
            };
6✔
62
            endpoint_schemas.insert(endpoint_name, schema);
6✔
63
        }
×
64

65
        let pipeline = dag_schemas
6✔
66
            .into_graph()
6✔
67
            .map_owned(|_, node| node.handle, |_, edge| edge);
22✔
68

6✔
69
        Ok(Self {
6✔
70
            pipeline,
6✔
71
            endpoints: endpoint_schemas,
6✔
72
        })
6✔
73
    }
6✔
74
}
75

×
76
pub async fn build(
6✔
77
    home_dir: &HomeDir,
6✔
78
    contract: &Contract,
6✔
79
    storage_config: &LogStorage,
6✔
80
) -> Result<(), BuildError> {
6✔
81
    if let Some(build_id) = needs_build(home_dir, contract, storage_config).await? {
6✔
82
        let build_name = build_id.name().to_string();
6✔
83
        create_build(home_dir, build_id, contract)?;
6✔
84
        info!("Created new build {build_name}");
6✔
85
    } else {
86
        info!("Building not needed");
×
87
    }
×
88
    Ok(())
6✔
89
}
6✔
90

×
91
async fn needs_build(
6✔
92
    home_dir: &HomeDir,
6✔
93
    contract: &Contract,
6✔
94
    storage_config: &LogStorage,
6✔
95
) -> Result<Option<BuildId>, BuildError> {
6✔
96
    let build_path = home_dir
6✔
97
        .find_latest_build_path()
6✔
98
        .map_err(|(path, error)| BuildError::FileSystem(path.into(), error))?;
6✔
99
    let Some(build_path) = build_path else {
6✔
100
        return Ok(Some(BuildId::first()));
6✔
101
    };
×
102

×
103
    let mut futures = vec![];
×
104
    for endpoint in contract.endpoints.keys() {
×
105
        let endpoint_path = build_path.get_endpoint_path(endpoint);
×
106
        let (storage, prefix) =
×
107
            create_log_storage(storage_config.clone(), endpoint_path.log_dir.into()).await?;
×
108
        futures.push(is_empty(storage, prefix));
×
109
    }
×
110
    if !try_join_all(futures)
×
111
        .await?
×
112
        .into_iter()
×
113
        .all(|is_empty| is_empty)
×
114
    {
×
115
        return Ok(Some(build_path.id.next()));
×
116
    }
×
117

×
118
    let existing_contract = Contract::deserialize(&build_path)?;
×
119
    for (endpoint, schema) in &contract.endpoints {
×
120
        if let Some(existing_schema) = existing_contract.endpoints.get(endpoint) {
×
121
            if schema == existing_schema {
×
122
                continue;
×
123
            }
×
124
        } else {
×
125
            return Ok(Some(build_path.id.next()));
×
126
        }
×
127
    }
128
    Ok(None)
×
129
}
6✔
130

×
131
async fn is_empty(storage: Box<dyn Storage>, prefix: String) -> Result<bool, BuildError> {
×
132
    let objects = storage.list_objects(prefix, None).await?;
×
133
    Ok(objects.objects.is_empty())
×
134
}
×
135

×
136
fn create_build(
6✔
137
    home_dir: &HomeDir,
6✔
138
    build_id: BuildId,
6✔
139
    contract: &Contract,
6✔
140
) -> Result<(), BuildError> {
6✔
141
    let build_path = home_dir
6✔
142
        .create_build_dir_all(build_id)
6✔
143
        .map_err(|(path, error)| BuildError::FileSystem(path.into(), error))?;
6✔
144

145
    contract.serialize(&build_path)?;
6✔
146

×
147
    let mut resources = Vec::new();
6✔
148

6✔
149
    let proto_folder_path = build_path.contracts_dir.as_ref();
6✔
150
    for (endpoint_name, schema) in &contract.endpoints {
12✔
151
        ProtoGenerator::generate(proto_folder_path, endpoint_name, schema)?;
6✔
152
        resources.push(endpoint_name.clone());
6✔
153
    }
×
154

155
    let common_resources = ProtoGenerator::copy_common(proto_folder_path)?;
6✔
156

157
    // Copy common service to be included in descriptor.
×
158
    resources.extend(common_resources);
6✔
159

6✔
160
    // Generate a descriptor based on all proto files generated within sink.
6✔
161
    ProtoGenerator::generate_descriptor(
6✔
162
        proto_folder_path,
6✔
163
        build_path.descriptor_path.as_ref(),
6✔
164
        &resources,
6✔
165
    )?;
6✔
166

×
167
    Ok(())
6✔
168
}
6✔
169

170
fn modify_schema(
6✔
171
    schema: &Schema,
6✔
172
    api_endpoint: &ApiEndpoint,
6✔
173
) -> Result<SchemaWithIndex, BuildError> {
6✔
174
    let mut schema = schema.clone();
6✔
175
    // Generated Cache index based on api_index
176
    let configured_index = create_primary_indexes(
6✔
177
        &schema.fields,
6✔
178
        api_endpoint
6✔
179
            .index
6✔
180
            .as_ref()
6✔
181
            .map(|index| index.primary_key.as_slice()),
6✔
182
    )?;
6✔
183
    // Generated schema in SQL
184
    let upstream_index = schema.primary_index.clone();
6✔
185

186
    let index = match (configured_index.is_empty(), upstream_index.is_empty()) {
6✔
187
        (true, true) => vec![],
6✔
188
        (true, false) => upstream_index,
×
189
        (false, true) => configured_index,
×
190
        (false, false) => {
191
            if !upstream_index.eq(&configured_index) {
×
192
                return Err(BuildError::MismatchPrimaryKey {
×
193
                    endpoint_name: api_endpoint.name.clone(),
×
194
                    expected: get_field_names(&schema, &upstream_index),
×
195
                    actual: get_field_names(&schema, &configured_index),
×
196
                });
×
197
            }
×
198
            configured_index
×
199
        }
×
200
    };
×
201

202
    schema.primary_index = index;
6✔
203

6✔
204
    let secondary_index_config = get_secondary_index_config(api_endpoint);
6✔
205
    let secondary_indexes = generate_secondary_indexes(&schema.fields, &secondary_index_config)?;
6✔
206

207
    Ok((schema, secondary_indexes))
6✔
208
}
6✔
209

×
210
fn get_field_names(schema: &Schema, indexes: &[usize]) -> Vec<String> {
×
211
    indexes
×
212
        .iter()
×
213
        .map(|idx| schema.fields[*idx].name.to_owned())
×
214
        .collect()
×
215
}
×
216

×
217
fn get_secondary_index_config(api_endpoint: &ApiEndpoint) -> Cow<SecondaryIndexConfig> {
218
    if let Some(config) = api_endpoint
6✔
219
        .index
6✔
220
        .as_ref()
6✔
221
        .and_then(|index| index.secondary.as_ref())
6✔
222
    {
223
        Cow::Borrowed(config)
×
224
    } else {
225
        Cow::Owned(SecondaryIndexConfig::default())
6✔
226
    }
×
227
}
6✔
228

229
fn create_primary_indexes(
6✔
230
    field_definitions: &[FieldDefinition],
6✔
231
    primary_key: Option<&[String]>,
6✔
232
) -> Result<Vec<usize>, BuildError> {
6✔
233
    let mut primary_index = Vec::new();
6✔
234
    if let Some(primary_key) = primary_key {
6✔
235
        for name in primary_key {
×
236
            primary_index.push(field_index_from_field_name(field_definitions, name)?);
×
237
        }
×
238
    }
6✔
239
    Ok(primary_index)
6✔
240
}
6✔
241

242
fn generate_secondary_indexes(
6✔
243
    field_definitions: &[FieldDefinition],
6✔
244
    config: &SecondaryIndexConfig,
6✔
245
) -> Result<Vec<IndexDefinition>, BuildError> {
6✔
246
    let mut result = vec![];
6✔
247

248
    // Create default indexes unless skipped.
249
    for (index, field) in field_definitions.iter().enumerate() {
14✔
250
        if config.skip_default.contains(&field.name) {
14✔
251
            continue;
×
252
        }
14✔
253

14✔
254
        match field.typ {
14✔
255
            // Create sorted inverted indexes for these fields
256
            FieldType::UInt
257
            | FieldType::U128
258
            | FieldType::Int
259
            | FieldType::I128
260
            | FieldType::Float
261
            | FieldType::Boolean
262
            | FieldType::Decimal
263
            | FieldType::Timestamp
264
            | FieldType::Date
265
            | FieldType::Point
266
            | FieldType::Duration => result.push(IndexDefinition::SortedInverted(vec![index])),
8✔
267

268
            // Create sorted inverted and full text indexes for string fields.
269
            FieldType::String => {
6✔
270
                result.push(IndexDefinition::SortedInverted(vec![index]));
6✔
271
                result.push(IndexDefinition::FullText(index));
6✔
272
            }
6✔
273

274
            // Skip creating indexes
275
            FieldType::Text | FieldType::Binary | FieldType::Json => (),
×
276
        }
277
    }
278

279
    // Create requested indexes.
280
    for create in &config.create {
6✔
281
        if let Some(index) = &create.index {
×
282
            match index {
×
283
                SecondaryIndex::SortedInverted(SortedInverted { fields }) => {
×
284
                    let fields = fields
×
285
                        .iter()
×
286
                        .map(|field| field_index_from_field_name(field_definitions, field))
×
287
                        .collect::<Result<Vec<_>, _>>()?;
×
288
                    result.push(IndexDefinition::SortedInverted(fields));
×
289
                }
290
                SecondaryIndex::FullText(FullText { field }) => {
×
291
                    let field = field_index_from_field_name(field_definitions, field)?;
×
292
                    result.push(IndexDefinition::FullText(field));
×
293
                }
294
            }
295
        }
×
296
    }
297

298
    Ok(result)
6✔
299
}
6✔
300

301
fn field_index_from_field_name(
×
302
    fields: &[FieldDefinition],
×
303
    field_name: &str,
×
304
) -> Result<usize, BuildError> {
×
305
    fields
×
306
        .iter()
×
307
        .position(|field| field.name == field_name)
×
308
        .ok_or(BuildError::FieldNotFound(field_name.to_string()))
×
309
}
×
310

311
impl Contract {
312
    fn serialize(&self, build_path: &BuildPath) -> Result<(), BuildError> {
313
        serde_json_to_path(&build_path.dag_path, &self.pipeline)?;
6✔
314

315
        for (endpoint_name, schema) in &self.endpoints {
12✔
316
            let endpoint_path = build_path.get_endpoint_path(endpoint_name);
6✔
317
            serde_json_to_path(&endpoint_path.schema_path, schema)?;
6✔
318
        }
319

320
        Ok(())
6✔
321
    }
6✔
322

323
    fn deserialize(build_path: &BuildPath) -> Result<Self, BuildError> {
×
324
        let pipeline: daggy::Dag<NodeHandle, EdgeType> =
×
325
            serde_json_from_path(&build_path.dag_path)?;
×
326

327
        let mut endpoints = BTreeMap::new();
×
328
        for (node_index, node) in pipeline.node_references() {
×
329
            // Endpoint must have zero out degree.
330
            if pipeline
×
331
                .edges_directed(node_index, Direction::Outgoing)
×
332
                .count()
×
333
                > 0
×
334
            {
335
                continue;
×
336
            }
×
337

×
338
            // `NodeHandle::id` is the endpoint name.
×
339
            let endpoint_name = node.id.clone();
×
340
            let endpoint_path = build_path.get_endpoint_path(&endpoint_name);
×
341
            let schema: EndpointSchema = serde_json_from_path(&endpoint_path.schema_path)?;
×
342
            endpoints.insert(endpoint_name, schema);
×
343
        }
344

345
        Ok(Self {
×
346
            pipeline,
×
347
            endpoints,
×
348
        })
×
349
    }
×
350
}
351

352
fn serde_json_to_path(path: impl AsRef<Path>, value: &impl Serialize) -> Result<(), BuildError> {
12✔
353
    let file = OpenOptions::new()
12✔
354
        .create(true)
12✔
355
        .write(true)
12✔
356
        .open(path.as_ref())
12✔
357
        .map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
12✔
358
    serde_json::to_writer_pretty(file, value)?;
12✔
359
    Ok(())
12✔
360
}
12✔
361

362
fn serde_json_from_path<T>(path: impl AsRef<Path>) -> Result<T, BuildError>
×
363
where
×
364
    T: DeserializeOwned,
×
365
{
×
366
    let file = OpenOptions::new()
×
367
        .read(true)
×
368
        .open(path.as_ref())
×
369
        .map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
×
370
    Ok(serde_json::from_reader(file)?)
×
371
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc