• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4012757265

pending completion
4012757265

Pull #737

github

GitHub
Merge 41e5235a4 into c7b362bed
Pull Request #737: feat: select * wildcard

55 of 55 new or added lines in 4 files covered. (100.0%)

23308 of 35040 relevant lines covered (66.52%)

37782.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.0
/dozer-orchestrator/src/pipeline/sinks.rs
1
use dozer_api::generator::protoc::generator::ProtoGenerator;
2
use dozer_api::grpc::internal_grpc::pipeline_response::ApiEvent;
3
use dozer_api::grpc::internal_grpc::PipelineResponse;
4
use dozer_api::grpc::types_helper;
5
use dozer_api::{CacheEndpoint, PipelineDetails};
6
use dozer_cache::cache::expression::QueryExpression;
7
use dozer_cache::cache::index::get_primary_key;
8
use dozer_cache::cache::{
9
    lmdb_rs::{self, Transaction},
10
    Cache, LmdbCache,
11
};
12
use dozer_core::dag::epoch::Epoch;
13
use dozer_core::dag::errors::{ExecutionError, SinkError};
14
use dozer_core::dag::node::{PortHandle, Sink, SinkFactory};
15
use dozer_core::dag::record_store::RecordReader;
16
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
17
use dozer_sql::pipeline::builder::SchemaSQLContext;
18
use dozer_types::crossbeam::channel::Sender;
19
use dozer_types::indicatif::{MultiProgress, ProgressBar, ProgressStyle};
20
use dozer_types::log::debug;
21
use dozer_types::models::api_endpoint::{ApiEndpoint, ApiIndex};
22
use dozer_types::models::api_security::ApiSecurity;
23
use dozer_types::models::flags::Flags;
24
use dozer_types::types::FieldType;
25
use dozer_types::types::{IndexDefinition, Operation, Schema, SchemaIdentifier};
26
use std::collections::hash_map::DefaultHasher;
27
use std::collections::HashMap;
28
use std::hash::Hasher;
29
use std::path::PathBuf;
30
use std::sync::Arc;
31

32
pub fn attach_progress(multi_pb: Option<MultiProgress>) -> ProgressBar {
1✔
33
    let pb = ProgressBar::new_spinner();
1✔
34
    multi_pb.as_ref().map(|m| m.add(pb.clone()));
1✔
35
    pb.set_style(
1✔
36
        ProgressStyle::with_template("{spinner:.blue} {msg}")
1✔
37
            .unwrap()
1✔
38
            // For more spinners check out the cli-spinners project:
1✔
39
            // https://github.com/sindresorhus/cli-spinners/blob/master/spinners.json
1✔
40
            .tick_strings(&[
1✔
41
                "▹▹▹▹▹",
1✔
42
                "▸▹▹▹▹",
1✔
43
                "▹▸▹▹▹",
1✔
44
                "▹▹▸▹▹",
1✔
45
                "▹▹▹▸▹",
1✔
46
                "▹▹▹▹▸",
1✔
47
                "▪▪▪▪▪",
1✔
48
            ]),
1✔
49
    );
1✔
50
    pb
1✔
51
}
1✔
52
#[derive(Debug, Clone)]
×
53
pub struct CacheSinkSettings {
54
    flags: Option<Flags>,
55
    api_security: Option<ApiSecurity>,
56
}
57
impl CacheSinkSettings {
58
    pub fn new(flags: Option<Flags>, api_security: Option<ApiSecurity>) -> Self {
×
59
        Self {
×
60
            flags,
×
61
            api_security,
×
62
        }
×
63
    }
×
64
}
65
#[derive(Debug)]
×
66
pub struct CacheSinkFactory {
67
    input_ports: Vec<PortHandle>,
68
    cache: Arc<LmdbCache>,
69
    api_endpoint: ApiEndpoint,
70
    notifier: Option<Sender<PipelineResponse>>,
71
    generated_path: PathBuf,
72
    multi_pb: MultiProgress,
73
    settings: CacheSinkSettings,
74
}
75

76
impl CacheSinkFactory {
77
    pub fn new(
×
78
        input_ports: Vec<PortHandle>,
×
79
        cache: Arc<LmdbCache>,
×
80
        api_endpoint: ApiEndpoint,
×
81
        notifier: Option<Sender<PipelineResponse>>,
×
82
        generated_path: PathBuf,
×
83
        multi_pb: MultiProgress,
×
84
        settings: CacheSinkSettings,
×
85
    ) -> Self {
×
86
        Self {
×
87
            input_ports,
×
88
            cache,
×
89
            api_endpoint,
×
90
            notifier,
×
91
            generated_path,
×
92
            multi_pb,
×
93
            settings,
×
94
        }
×
95
    }
×
96

97
    fn get_output_schema(
×
98
        &self,
×
99
        schema: &Schema,
×
100
    ) -> Result<(Schema, Vec<IndexDefinition>), ExecutionError> {
×
101
        let mut schema = schema.clone();
×
102

×
103
        // Get hash of schema
×
104
        let hash = self.get_schema_hash();
×
105

106
        // Generated Cache index based on api_index
107
        let configured_index = create_primary_indexes(
×
108
            &schema,
×
109
            &self.api_endpoint.index.to_owned().unwrap_or_default(),
×
110
        )?;
×
111
        // Generated schema in SQL
112
        let upstream_index = schema.primary_index.clone();
×
113

114
        let index = match (configured_index.is_empty(), upstream_index.is_empty()) {
×
115
            (true, true) => vec![],
×
116
            (true, false) => upstream_index,
×
117
            (false, true) => configured_index,
×
118
            (false, false) => {
119
                if !upstream_index.eq(&configured_index) {
×
120
                    return Err(ExecutionError::MismatchPrimaryKey {
×
121
                        endpoint_name: self.api_endpoint.name.clone(),
×
122
                        expected: get_field_names(&schema, &upstream_index),
×
123
                        actual: get_field_names(&schema, &configured_index),
×
124
                    });
×
125
                }
×
126
                configured_index
×
127
            }
128
        };
129

130
        schema.primary_index = index;
×
131

×
132
        schema.identifier = Some(SchemaIdentifier {
×
133
            id: hash as u32,
×
134
            version: 1,
×
135
        });
×
136

×
137
        // Automatically create secondary indexes
×
138
        let secondary_indexes = schema
×
139
            .fields
×
140
            .iter()
×
141
            .enumerate()
×
142
            .flat_map(|(idx, f)| match f.typ {
×
143
                // Create sorted inverted indexes for these fields
144
                FieldType::UInt
145
                | FieldType::Int
146
                | FieldType::Float
147
                | FieldType::Boolean
148
                | FieldType::Decimal
149
                | FieldType::Timestamp
150
                | FieldType::Date => vec![IndexDefinition::SortedInverted(vec![idx])],
×
151

152
                // Create sorted inverted and full text indexes for string fields.
153
                FieldType::String => vec![
×
154
                    IndexDefinition::SortedInverted(vec![idx]),
×
155
                    IndexDefinition::FullText(idx),
×
156
                ],
×
157

158
                // Create full text indexes for text fields
159
                FieldType::Text => vec![IndexDefinition::FullText(idx)],
×
160

161
                // Skip creating indexes
162
                FieldType::Binary | FieldType::Bson => vec![],
×
163
            })
×
164
            .collect();
×
165
        Ok((schema, secondary_indexes))
×
166
    }
×
167

168
    fn get_schema_hash(&self) -> u64 {
×
169
        // Get hash of SQL
×
170
        let mut hasher = DefaultHasher::new();
×
171
        let bytes = self.api_endpoint.sql.as_bytes();
×
172
        hasher.write(bytes);
×
173

×
174
        hasher.finish()
×
175
    }
×
176
}
177

178
impl SinkFactory<SchemaSQLContext> for CacheSinkFactory {
179
    fn set_input_schema(
×
180
        &self,
×
181
        _input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
182
    ) -> Result<(), ExecutionError> {
×
183
        Ok(())
×
184
    }
×
185

186
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
187
        self.input_ports.clone()
×
188
    }
×
189

190
    fn prepare(
×
191
        &self,
×
192
        input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
193
    ) -> Result<(), ExecutionError> {
×
194
        use std::println as stdinfo;
×
195
        // Insert schemas into cache
×
196
        debug!(
×
197
            "SinkFactory: Initialising CacheSinkFactory: {}",
×
198
            self.api_endpoint.name
199
        );
200
        for (_, (schema, _ctx)) in input_schemas.iter() {
×
201
            stdinfo!(
×
202
                "SINK: Initializing output schema: {}",
×
203
                self.api_endpoint.name
×
204
            );
×
205
            let (pipeline_schema, secondary_indexes) = self.get_output_schema(schema)?;
×
206
            pipeline_schema.print().printstd();
×
207

×
208
            if self
×
209
                .cache
×
210
                .get_schema_and_indexes_by_name(&self.api_endpoint.name)
×
211
                .is_err()
×
212
            {
213
                self.cache
×
214
                    .insert_schema(
×
215
                        &self.api_endpoint.name,
×
216
                        &pipeline_schema,
×
217
                        &secondary_indexes,
×
218
                    )
×
219
                    .map_err(|e| {
×
220
                        ExecutionError::SinkError(SinkError::SchemaUpdateFailed(Box::new(e)))
×
221
                    })?;
×
222
                debug!(
×
223
                    "SinkFactory: Inserted schema for {}",
×
224
                    self.api_endpoint.name
225
                );
226
            }
×
227
        }
228

229
        ProtoGenerator::generate(
×
230
            &self.generated_path,
×
231
            PipelineDetails {
×
232
                schema_name: self.api_endpoint.name.to_owned(),
×
233
                cache_endpoint: CacheEndpoint {
×
234
                    cache: self.cache.to_owned(),
×
235
                    endpoint: self.api_endpoint.to_owned(),
×
236
                },
×
237
            },
×
238
            &self.settings.api_security,
×
239
            &self.settings.flags,
×
240
        )
×
241
        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
×
242

243
        Ok(())
×
244
    }
×
245

246
    fn build(
×
247
        &self,
×
248
        input_schemas: HashMap<PortHandle, Schema>,
×
249
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
250
        let mut sink_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)> = HashMap::new();
×
251
        // Insert schemas into cache
252
        for (k, schema) in input_schemas {
×
253
            let (schema, secondary_indexes) = self.get_output_schema(&schema)?;
×
254
            sink_schemas.insert(k, (schema, secondary_indexes));
×
255
        }
256
        Ok(Box::new(CacheSink::new(
×
257
            self.cache.clone(),
×
258
            self.api_endpoint.clone(),
×
259
            sink_schemas,
×
260
            self.notifier.clone(),
×
261
            Some(self.multi_pb.clone()),
×
262
        )))
×
263
    }
×
264
}
265

266
fn create_primary_indexes(
×
267
    schema: &Schema,
×
268
    api_index: &ApiIndex,
×
269
) -> Result<Vec<usize>, ExecutionError> {
×
270
    let mut primary_index = Vec::new();
×
271
    for name in api_index.primary_key.iter() {
×
272
        let idx = schema
×
273
            .fields
×
274
            .iter()
×
275
            .position(|fd| {
×
276
                let slice_char = ".";
×
277
                fd.name == name.clone()
×
278
                    || (fd.name.contains(slice_char) && {
×
279
                        let slice: String = fd.name.chars()
×
280
                            .skip(fd.name.find(slice_char).unwrap().wrapping_add(1))
×
281
                            .collect();
×
282
                        slice == name.clone()
×
283
                })
×
284
            })
×
285
            .map_or(Err(ExecutionError::FieldNotFound(name.to_owned())), |p| {
×
286
                Ok(p)
×
287
            })?;
×
288

×
289
        primary_index.push(idx);
×
290
    }
×
291
    Ok(primary_index)
×
292
}
×
293

294
fn get_field_names(schema: &Schema, indexes: &[usize]) -> Vec<String> {
×
295
    indexes
×
296
        .iter()
×
297
        .map(|idx| schema.fields[*idx].name.to_owned())
×
298
        .collect()
×
299
}
×
300

301
#[derive(Debug)]
×
302
pub struct CacheSink {
303
    // It's not really 'static, the actual lifetime is the lifetime of `cache`. See comments in `process`.
304
    txn: Option<lmdb_rs::RwTransaction<'static>>,
305
    cache: Arc<LmdbCache>,
×
306
    counter: usize,
×
307
    input_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)>,
×
308
    api_endpoint: ApiEndpoint,
×
309
    pb: ProgressBar,
×
310
    notifier: Option<Sender<PipelineResponse>>,
×
311
}
×
312

×
313
impl Sink for CacheSink {
×
314
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
2✔
315
        // Update Counter on commit
2✔
316
        self.pb.set_message(format!(
2✔
317
            "{}: Count: {}",
2✔
318
            self.api_endpoint.name.to_owned(),
2✔
319
            self.counter,
2✔
320
        ));
2✔
321
        if let Some(txn) = self.txn.take() {
2✔
322
            txn.commit().map_err(|e| {
2✔
323
                ExecutionError::SinkError(SinkError::CacheCommitTransactionFailed(Box::new(e)))
×
324
            })?;
2✔
325
        }
×
326
        Ok(())
2✔
327
    }
2✔
328

×
329
    fn init(&mut self, _tx: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
330
        let query = QueryExpression::new(None, vec![], None, 0);
×
331
        self.counter = self
×
332
            .cache
×
333
            .count(&self.api_endpoint.name, &query)
×
334
            .map_err(|e| ExecutionError::SinkError(SinkError::CacheCountFailed(Box::new(e))))?;
×
335

×
336
        debug!(
×
337
            "SINK: Initialising CacheSink: {} with count: {}",
×
338
            self.api_endpoint.name, self.counter
×
339
        );
×
340
        Ok(())
×
341
    }
×
342

×
343
    fn process(
2✔
344
        &mut self,
2✔
345
        from_port: PortHandle,
2✔
346
        op: Operation,
2✔
347
        _tx: &SharedTransaction,
2✔
348
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
2✔
349
    ) -> Result<(), ExecutionError> {
2✔
350
        self.counter += 1;
2✔
351

2✔
352
        if self.txn.is_none() {
2✔
353
            let txn = self.cache.begin_rw_txn().map_err(|e| {
2✔
354
                ExecutionError::SinkError(SinkError::CacheBeginTransactionFailed(Box::new(e)))
×
355
            })?;
2✔
356
            // SAFETY:
×
357
            // 1. `std::mem::transmute` is only used to extend the lifetime of `txn` to `'static`.
×
358
            // 2. `RwTransaction` doesn't reference data in `LmdbCache`, the lifetime of it is only
×
359
            // to ensure that the returned `RwTransaction` does not outlive `LmdbCache`.
×
360
            // 3. `txn` in `CacheSink` is private, and we don't expose it to the outside, so the one owning
×
361
            // `txn` must own `CacheSink`.
×
362
            // 4. The declaration order in `CacheSink` ensures `txn` is dropped before `cache`.
363
            let txn = unsafe {
2✔
364
                std::mem::transmute::<lmdb_rs::RwTransaction<'_>, lmdb_rs::RwTransaction<'static>>(
2✔
365
                    txn,
2✔
366
                )
2✔
367
            };
2✔
368
            self.txn = Some(txn);
2✔
369
        }
×
370
        let txn = self.txn.as_mut().unwrap();
2✔
371

×
372
        let (schema, secondary_indexes) = self
2✔
373
            .input_schemas
2✔
374
            .get(&from_port)
2✔
375
            .ok_or(ExecutionError::SchemaNotInitialized)?;
2✔
376

×
377
        if let Some(notifier) = &self.notifier {
2✔
378
            let op = types_helper::map_operation(self.api_endpoint.name.to_owned(), &op);
×
379
            notifier
×
380
                .try_send(PipelineResponse {
×
381
                    endpoint: self.api_endpoint.name.to_owned(),
×
382
                    api_event: Some(ApiEvent::Op(op)),
×
383
                })
×
384
                .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
×
385
        }
2✔
386
        match op {
2✔
387
            Operation::Delete { mut old } => {
×
388
                old.schema_id = schema.identifier;
×
389
                let key = get_primary_key(&schema.primary_index, &old.values);
×
390
                self.cache
×
391
                    .delete_with_txn(txn, &key, &old, schema, secondary_indexes)
×
392
                    .map_err(|e| {
×
393
                        ExecutionError::SinkError(SinkError::CacheDeleteFailed(Box::new(e)))
×
394
                    })?;
×
395
            }
×
396
            Operation::Insert { mut new } => {
1✔
397
                new.schema_id = schema.identifier;
1✔
398
                self.cache
1✔
399
                    .insert_with_txn(txn, &new, schema, secondary_indexes)
1✔
400
                    .map_err(|e| {
1✔
401
                        ExecutionError::SinkError(SinkError::CacheInsertFailed(Box::new(e)))
×
402
                    })?;
1✔
403
            }
×
404
            Operation::Update { mut old, mut new } => {
1✔
405
                old.schema_id = schema.identifier;
1✔
406
                new.schema_id = schema.identifier;
1✔
407
                let key = get_primary_key(&schema.primary_index, &old.values);
1✔
408
                self.cache
1✔
409
                    .update_with_txn(txn, &key, &old, &new, schema, secondary_indexes)
1✔
410
                    .map_err(|e| {
1✔
411
                        ExecutionError::SinkError(SinkError::CacheUpdateFailed(Box::new(e)))
×
412
                    })?;
1✔
413
            }
×
414
        }
×
415

×
416
        Ok(())
2✔
417
    }
2✔
418
}
×
419

×
420
impl CacheSink {
×
421
    pub fn new(
1✔
422
        cache: Arc<LmdbCache>,
1✔
423
        api_endpoint: ApiEndpoint,
1✔
424
        input_schemas: HashMap<PortHandle, (Schema, Vec<IndexDefinition>)>,
1✔
425
        notifier: Option<Sender<PipelineResponse>>,
1✔
426
        multi_pb: Option<MultiProgress>,
1✔
427
    ) -> Self {
1✔
428
        let pb = attach_progress(multi_pb);
1✔
429
        Self {
1✔
430
            txn: None,
1✔
431
            cache,
1✔
432
            counter: 0,
1✔
433
            input_schemas,
1✔
434
            api_endpoint,
1✔
435
            pb,
1✔
436
            notifier,
1✔
437
        }
1✔
438
    }
1✔
439
}
440

441
#[cfg(test)]
442
mod tests {
443

444
    use crate::test_utils;
445
    use dozer_cache::cache::{index, Cache};
446

×
447
    use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
448
    use dozer_core::dag::node::{NodeHandle, Sink};
×
449
    use dozer_core::storage::lmdb_storage::LmdbEnvironmentManager;
×
450

×
451
    use dozer_types::types::{Field, IndexDefinition, Operation, Record, SchemaIdentifier};
×
452
    use std::collections::HashMap;
×
453
    use tempdir::TempDir;
×
454

×
455
    #[test]
1✔
456
    // This test cases covers update of records when primary key changes because of value change in primary_key
×
457
    fn update_record_when_primary_changes() {
1✔
458
        let tmp_dir = TempDir::new("example").unwrap();
1✔
459
        let env = LmdbEnvironmentManager::create(tmp_dir.path(), "test").unwrap();
1✔
460
        let txn = env.create_txn().unwrap();
1✔
461

1✔
462
        let schema = test_utils::get_schema();
1✔
463
        let secondary_indexes: Vec<IndexDefinition> = schema
1✔
464
            .fields
1✔
465
            .iter()
1✔
466
            .enumerate()
1✔
467
            .map(|(idx, _f)| IndexDefinition::SortedInverted(vec![idx]))
2✔
468
            .collect();
1✔
469

1✔
470
        let (cache, mut sink) = test_utils::init_sink(&schema, secondary_indexes.clone());
1✔
471

1✔
472
        let mut input_schemas = HashMap::new();
1✔
473
        input_schemas.insert(DEFAULT_PORT_HANDLE, schema.clone());
1✔
474
        // sink.update_schema(&input_schemas).unwrap();
1✔
475

1✔
476
        // Initialing schemas
1✔
477
        cache
1✔
478
            .insert_schema("films", &schema, &secondary_indexes)
1✔
479
            .unwrap();
1✔
480

1✔
481
        let initial_values = vec![Field::Int(1), Field::String("Film name old".to_string())];
1✔
482

1✔
483
        let updated_values = vec![
1✔
484
            Field::Int(2),
1✔
485
            Field::String("Film name updated".to_string()),
1✔
486
        ];
1✔
487

1✔
488
        let insert_operation = Operation::Insert {
1✔
489
            new: Record {
1✔
490
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
491
                values: initial_values.clone(),
1✔
492
                version: None,
1✔
493
            },
1✔
494
        };
1✔
495

1✔
496
        let update_operation = Operation::Update {
1✔
497
            old: Record {
1✔
498
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
499
                values: initial_values.clone(),
1✔
500
                version: None,
1✔
501
            },
1✔
502
            new: Record {
1✔
503
                schema_id: Option::from(SchemaIdentifier { id: 1, version: 1 }),
1✔
504
                values: updated_values.clone(),
1✔
505
                version: None,
1✔
506
            },
1✔
507
        };
1✔
508

1✔
509
        sink.process(DEFAULT_PORT_HANDLE, insert_operation, &txn, &HashMap::new())
1✔
510
            .unwrap();
1✔
511
        sink.commit(
1✔
512
            &dozer_core::dag::epoch::Epoch::new(
1✔
513
                0,
1✔
514
                [(
1✔
515
                    NodeHandle::new(Some(DEFAULT_PORT_HANDLE), "".to_string()),
1✔
516
                    (0_u64, 0_u64),
1✔
517
                )]
1✔
518
                .into_iter()
1✔
519
                .collect(),
1✔
520
            ),
1✔
521
            &txn,
1✔
522
        )
1✔
523
        .unwrap();
1✔
524

1✔
525
        let key = index::get_primary_key(&schema.primary_index, &initial_values);
1✔
526
        let record = cache.get(&key).unwrap();
1✔
527

1✔
528
        assert_eq!(initial_values, record.values);
1✔
529

×
530
        sink.process(DEFAULT_PORT_HANDLE, update_operation, &txn, &HashMap::new())
1✔
531
            .unwrap();
1✔
532
        let epoch1 = dozer_core::dag::epoch::Epoch::new(
1✔
533
            0,
1✔
534
            [(
1✔
535
                NodeHandle::new(Some(DEFAULT_PORT_HANDLE), "".to_string()),
1✔
536
                (0_u64, 1_u64),
1✔
537
            )]
1✔
538
            .into_iter()
1✔
539
            .collect(),
1✔
540
        );
1✔
541
        sink.commit(&epoch1, &txn).unwrap();
1✔
542

1✔
543
        // Primary key with old values
1✔
544
        let key = index::get_primary_key(&schema.primary_index, &initial_values);
1✔
545

1✔
546
        let record = cache.get(&key);
1✔
547

1✔
548
        assert!(record.is_err());
1✔
549

550
        // Primary key with updated values
551
        let key = index::get_primary_key(&schema.primary_index, &updated_values);
1✔
552
        let record = cache.get(&key).unwrap();
1✔
553

1✔
554
        assert_eq!(updated_values, record.values);
1✔
555
    }
1✔
556
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc