• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3978628498

pending completion
3978628498

Pull #705

github

GitHub
Merge 8775fcda7 into e2f9ad287
Pull Request #705: chore: support for generic schema context in `Sink`, `Processor` and `Source` factories

572 of 572 new or added lines in 35 files covered. (100.0%)

22294 of 34850 relevant lines covered (63.97%)

40332.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.46
/dozer-sql/src/pipeline/expression/scalar/tests/scalar_common.rs
1
use crate::pipeline::builder::SchemaSQLContext;
2
use crate::pipeline::{projection::factory::ProjectionProcessorFactory, tests::utils::get_select};
3
use dozer_core::dag::channels::ProcessorChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::node::ProcessorFactory;
6
use dozer_core::storage::lmdb_storage::LmdbEnvironmentManager;
7
use dozer_types::types::{Field, Operation, Record, Schema};
8
use std::collections::HashMap;
9
use tempdir::TempDir;
10

11
struct TestChannelForwarder {
12
    operations: Vec<Operation>,
13
}
14

15
impl ProcessorChannelForwarder for TestChannelForwarder {
×
16
    fn send(
49✔
17
        &mut self,
49✔
18
        op: dozer_types::types::Operation,
49✔
19
        _port: dozer_core::dag::node::PortHandle,
49✔
20
    ) -> Result<(), dozer_core::dag::errors::ExecutionError> {
49✔
21
        self.operations.push(op);
49✔
22
        Ok(())
49✔
23
    }
49✔
24
}
×
25

×
26
pub(crate) fn run_scalar_fct(sql: &str, schema: Schema, input: Vec<Field>) -> Field {
50✔
27
    let select = get_select(sql).unwrap();
50✔
28
    let processor_factory = ProjectionProcessorFactory::_new(select.projection);
50✔
29
    processor_factory
50✔
30
        .get_output_schema(
50✔
31
            &DEFAULT_PORT_HANDLE,
50✔
32
            &[(DEFAULT_PORT_HANDLE, (schema.clone(), SchemaSQLContext {}))]
50✔
33
                .into_iter()
50✔
34
                .collect(),
50✔
35
        )
50✔
36
        .unwrap();
50✔
37

50✔
38
    let mut processor = processor_factory
50✔
39
        .build(
50✔
40
            HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
50✔
41
            HashMap::new(),
50✔
42
        )
50✔
43
        .unwrap();
50✔
44

50✔
45
    let tmp_dir = TempDir::new("test").unwrap();
50✔
46
    let mut storage = LmdbEnvironmentManager::create(tmp_dir.path(), "projection_test").unwrap();
50✔
47

50✔
48
    processor.init(&mut storage).unwrap();
50✔
49

50✔
50
    let tx = storage.create_txn().unwrap();
50✔
51
    let mut fw = TestChannelForwarder { operations: vec![] };
50✔
52

50✔
53
    let op = Operation::Insert {
50✔
54
        new: Record::new(None, input, None),
50✔
55
    };
50✔
56

50✔
57
    processor
50✔
58
        .process(DEFAULT_PORT_HANDLE, op, &mut fw, &tx, &HashMap::new())
50✔
59
        .unwrap();
50✔
60

50✔
61
    match &fw.operations[0] {
50✔
62
        Operation::Insert { new } => new.values[0].clone(),
50✔
63
        _ => panic!("Unable to find result value"),
×
64
    }
×
65
}
50✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc