• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965135367

pending completion
3965135367

Pull #680

github

GitHub
Merge 1add77327 into 56c0cf2b3
Pull Request #680: feat: Implement nested queries and CTE.

506 of 506 new or added lines in 18 files covered. (100.0%)

21999 of 33062 relevant lines covered (66.54%)

50489.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/dozer-sql/src/pipeline/expression/scalar/tests/scalar_common.rs
1
use crate::pipeline::{projection::factory::ProjectionProcessorFactory, tests::utils::get_select};
2
use dozer_core::dag::channels::ProcessorChannelForwarder;
3
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
4
use dozer_core::dag::node::ProcessorFactory;
5
use dozer_core::storage::lmdb_storage::LmdbEnvironmentManager;
6
use dozer_types::types::{Field, Operation, Record, Schema};
7
use std::collections::HashMap;
8
use tempdir::TempDir;
9

10
struct TestChannelForwarder {
11
    operations: Vec<Operation>,
12
}
13

14
impl ProcessorChannelForwarder for TestChannelForwarder {
15
    fn send(
48✔
16
        &mut self,
48✔
17
        op: dozer_types::types::Operation,
48✔
18
        _port: dozer_core::dag::node::PortHandle,
48✔
19
    ) -> Result<(), dozer_core::dag::errors::ExecutionError> {
48✔
20
        self.operations.push(op);
48✔
21
        Ok(())
48✔
22
    }
48✔
23
}
×
24

×
25
pub(crate) fn run_scalar_fct(sql: &str, schema: Schema, input: Vec<Field>) -> Field {
49✔
26
    let select = get_select(sql).unwrap();
49✔
27
    let processor_factory = ProjectionProcessorFactory::_new(select.projection);
49✔
28
    processor_factory
49✔
29
        .get_output_schema(
49✔
30
            &DEFAULT_PORT_HANDLE,
49✔
31
            &[(DEFAULT_PORT_HANDLE, schema.clone())]
49✔
32
                .into_iter()
49✔
33
                .collect(),
49✔
34
        )
49✔
35
        .unwrap();
49✔
36

49✔
37
    let mut processor = processor_factory
49✔
38
        .build(
49✔
39
            HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
49✔
40
            HashMap::new(),
49✔
41
        )
49✔
42
        .unwrap();
49✔
43

49✔
44
    let tmp_dir = TempDir::new("test").unwrap();
49✔
45
    let mut storage = LmdbEnvironmentManager::create(tmp_dir.path(), "projection_test").unwrap();
49✔
46

49✔
47
    processor.init(&mut storage).unwrap();
49✔
48

49✔
49
    let tx = storage.create_txn().unwrap();
49✔
50
    let mut fw = TestChannelForwarder { operations: vec![] };
49✔
51

49✔
52
    let op = Operation::Insert {
49✔
53
        new: Record::new(None, input, None),
49✔
54
    };
49✔
55

49✔
56
    processor
49✔
57
        .process(DEFAULT_PORT_HANDLE, op, &mut fw, &tx, &HashMap::new())
49✔
58
        .unwrap();
49✔
59

49✔
60
    match &fw.operations[0] {
49✔
61
        Operation::Insert { new } => new.values[0].clone(),
49✔
62
        _ => panic!("Unable to find result value"),
×
63
    }
×
64
}
49✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc