• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3983272635

pending completion
3983272635

push

github

GitHub
fix insert after delete problem in aggregation (#709)

52 of 52 new or added lines in 2 files covered. (100.0%)

22141 of 34542 relevant lines covered (64.1%)

46204.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.98
/dozer-sql/src/pipeline/aggregation/tests/aggregation_null.rs
1
use crate::output;
2
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
3
use crate::pipeline::aggregation::tests::aggregation_tests_utils::{
4
    delete_exp, delete_field, init_input_schema, init_processor, insert_exp, insert_field,
5
    FIELD_100_INT, FIELD_1_INT, ITALY,
6
};
7
use crate::pipeline::builder::SchemaSQLContext;
8
use crate::pipeline::tests::utils::get_select;
9
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
10
use dozer_core::dag::node::ProcessorFactory;
11
use dozer_types::types::FieldType::Int;
12
use dozer_types::types::{Field, FieldDefinition, FieldType, Operation, Record, Schema};
13
use std::collections::HashMap;
×
14

×
15
#[test]
1✔
16
fn test_sum_aggregation_null() {
1✔
17
    let schema = init_input_schema(Int, "SUM");
1✔
18
    let (processor, tx) = init_processor(
1✔
19
        "SELECT Country, SUM(Salary) \
1✔
20
        FROM Users \
1✔
21
        WHERE Salary >= 1 GROUP BY Country",
1✔
22
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
23
    )
1✔
24
    .unwrap();
1✔
25

1✔
26
    // Insert 100 for segment Italy
1✔
27
    /*
1✔
28
        NULL, 100.0
1✔
29
        -------------
1✔
30
        SUM = 100.0
1✔
31
    */
1✔
32
    let inp = Operation::Insert {
1✔
33
        new: Record::new(
1✔
34
            None,
1✔
35
            vec![
1✔
36
                Field::Int(0),
1✔
37
                Field::Null,
1✔
38
                FIELD_100_INT.clone(),
1✔
39
                FIELD_100_INT.clone(),
1✔
40
            ],
1✔
41
            None,
1✔
42
        ),
1✔
43
    };
1✔
44
    let out = output!(processor, inp, tx);
1✔
45
    let exp = vec![Operation::Insert {
1✔
46
        new: Record::new(None, vec![Field::Null, FIELD_100_INT.clone()], None),
1✔
47
    }];
1✔
48
    assert_eq!(out, exp);
1✔
49
}
1✔
50

×
51
#[test]
1✔
52
fn test_sum_aggregation_del_and_insert() {
1✔
53
    let schema = init_input_schema(Int, "COUNT");
1✔
54
    let (processor, tx) = init_processor(
1✔
55
        "SELECT Country, COUNT(Salary) \
1✔
56
        FROM Users \
1✔
57
        WHERE Salary >= 1 GROUP BY Country",
1✔
58
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
59
    )
1✔
60
    .unwrap();
1✔
61

1✔
62
    // Insert 100 for segment Italy
1✔
63
    /*
1✔
64
        Italy, 100.0
1✔
65
        -------------
1✔
66
        COUNT = 1
1✔
67
    */
1✔
68
    let mut inp = insert_field(ITALY, FIELD_100_INT);
1✔
69
    let mut out = output!(processor, inp, tx);
1✔
70
    let mut exp = vec![insert_exp(ITALY, FIELD_1_INT)];
1✔
71
    assert_eq!(out, exp);
1✔
72

×
73
    // Delete last record
×
74
    /*
×
75
        -------------
×
76
        COUNT = 0
×
77
    */
×
78
    inp = delete_field(ITALY, FIELD_100_INT);
1✔
79
    out = output!(processor, inp, tx);
1✔
80
    exp = vec![delete_exp(ITALY, FIELD_1_INT)];
1✔
81
    assert_eq!(out, exp);
1✔
82

×
83
    // Insert 100 for segment Italy
×
84
    /*
×
85
        Italy, 100.0
×
86
        -------------
87
        COUNT = 1
88
    */
89
    let mut inp = insert_field(ITALY, FIELD_100_INT);
1✔
90
    let mut out = output!(processor, inp, tx);
1✔
91
    let mut exp = vec![insert_exp(ITALY, FIELD_1_INT)];
1✔
92
    assert_eq!(out, exp);
1✔
93
}
1✔
94

95
#[test]
1✔
96
fn test_aggregation_alias() {
1✔
97
    let schema = Schema::empty()
1✔
98
        .field(
1✔
99
            FieldDefinition::new(String::from("ID"), FieldType::Int, false),
1✔
100
            false,
1✔
101
        )
1✔
102
        .field(
1✔
103
            FieldDefinition::new(String::from("Salary"), FieldType::Int, false),
1✔
104
            false,
1✔
105
        )
1✔
106
        .clone();
1✔
107

1✔
108
    let select = get_select("SELECT ID, SUM(Salary) as Salaries FROM Users GROUP BY ID").unwrap();
1✔
109

1✔
110
    let factory = AggregationProcessorFactory::new(select.projection, select.group_by, false);
1✔
111
    let out_schema = factory
1✔
112
        .get_output_schema(
1✔
113
            &DEFAULT_PORT_HANDLE,
1✔
114
            &[(DEFAULT_PORT_HANDLE, (schema, SchemaSQLContext {}))]
1✔
115
                .into_iter()
1✔
116
                .collect(),
1✔
117
        )
1✔
118
        .unwrap()
1✔
119
        .0;
1✔
120

1✔
121
    assert_eq!(
1✔
122
        out_schema,
1✔
123
        Schema::empty()
1✔
124
            .field(
1✔
125
                FieldDefinition::new(String::from("ID"), FieldType::Int, false),
1✔
126
                true,
1✔
127
            )
1✔
128
            .field(
1✔
129
                FieldDefinition::new(String::from("Salaries"), FieldType::Int, false),
1✔
130
                false,
1✔
131
            )
1✔
132
            .clone()
1✔
133
    );
1✔
134
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc