• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4060889261

pending completion
4060889261

Pull #760

github

GitHub
Merge 2e12e4b69 into de98caa91
Pull Request #760: chore: improve coverage

245 of 245 new or added lines in 6 files covered. (100.0%)

25295 of 39298 relevant lines covered (64.37%)

37157.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.88
/dozer-sql/src/pipeline/aggregation/tests/aggregation_sum_tests.rs
1
use crate::output;
2
use crate::pipeline::aggregation::aggregator::Aggregator;
3
use crate::pipeline::aggregation::tests::aggregation_tests_utils::{
4
    delete_exp, delete_field, get_decimal_field, init_input_schema, init_processor, insert_exp,
5
    insert_field, update_exp, update_field, FIELD_0_FLOAT, FIELD_0_INT, FIELD_100_FLOAT,
6
    FIELD_100_INT, FIELD_100_UINT, FIELD_150_FLOAT, FIELD_150_INT, FIELD_150_UINT, FIELD_200_FLOAT,
7
    FIELD_200_INT, FIELD_200_UINT, FIELD_250_FLOAT, FIELD_250_INT, FIELD_250_UINT, FIELD_350_FLOAT,
8
    FIELD_350_INT, FIELD_350_UINT, FIELD_50_FLOAT, FIELD_50_INT, FIELD_50_UINT, FIELD_NULL, ITALY,
9
    SINGAPORE,
10
};
11
use crate::pipeline::errors::PipelineError::InvalidOperandType;
12
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
13
use dozer_types::log::debug;
14
use dozer_types::types::Field;
15
use dozer_types::types::FieldType::{Decimal, Float, Int, Text, UInt};
×
16
use std::any::Any;
×
17
use std::collections::HashMap;
×
18

×
19
#[test]
1✔
20
fn test_sum_aggregator() {
1✔
21
    let sum_aggr = Aggregator::Sum;
1✔
22
    assert_eq!(Decimal, sum_aggr.get_return_type(Decimal));
1✔
23
    assert_eq!(Float, sum_aggr.get_return_type(Float));
1✔
24
    assert_eq!(Int, sum_aggr.get_return_type(Int));
1✔
25
    assert_eq!(UInt, sum_aggr.get_return_type(UInt));
1✔
26
    debug!("{}", sum_aggr);
1✔
27
}
1✔
28

×
29
#[test]
1✔
30
fn failure_min_aggregator() {
1✔
31
    let schema = init_input_schema(Text, "SUM");
1✔
32
    let (processor, tx) = init_processor(
1✔
33
        "SELECT Country, SUM(Salary) \
1✔
34
        FROM Users \
1✔
35
        WHERE Salary >= 1 GROUP BY Country",
1✔
36
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
37
    )
1✔
38
    .unwrap();
1✔
39

1✔
40
    let mut inp = insert_field(ITALY, &Field::Text("test".to_string()));
1✔
41
    let out = processor
1✔
42
        .aggregate(&mut tx.write(), processor.db.unwrap(), inp)
1✔
43
        .unwrap_err();
1✔
44
    assert_eq!(
1✔
45
        InvalidOperandType("SUM".to_string()).type_id(),
1✔
46
        out.type_id()
1✔
47
    );
1✔
48

49
    inp = delete_field(ITALY, &Field::Text("test".to_string()));
1✔
50
    let out = processor
1✔
51
        .aggregate(&mut tx.write(), processor.db.unwrap(), inp)
1✔
52
        .unwrap_err();
1✔
53
    assert_eq!(
1✔
54
        InvalidOperandType("SUM".to_string()).type_id(),
1✔
55
        out.type_id()
1✔
56
    );
1✔
57

58
    inp = update_field(
1✔
59
        ITALY,
1✔
60
        ITALY,
1✔
61
        &Field::Text("test".to_string()),
1✔
62
        &Field::Text("test".to_string()),
1✔
63
    );
1✔
64
    let out = processor
1✔
65
        .aggregate(&mut tx.write(), processor.db.unwrap(), inp)
1✔
66
        .unwrap_err();
1✔
67
    assert_eq!(
1✔
68
        InvalidOperandType("SUM".to_string()).type_id(),
1✔
69
        out.type_id()
1✔
70
    );
1✔
71
}
1✔
72

73
#[test]
1✔
74
fn test_sum_aggregation_float() {
1✔
75
    let schema = init_input_schema(Float, "SUM");
1✔
76
    let (processor, tx) = init_processor(
1✔
77
        "SELECT Country, SUM(Salary) \
1✔
78
        FROM Users \
1✔
79
        WHERE Salary >= 1 GROUP BY Country",
1✔
80
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
81
    )
1✔
82
    .unwrap();
1✔
83

1✔
84
    // Insert 100 for segment Italy
1✔
85
    /*
1✔
86
        Italy, 100.0
1✔
87
        -------------
1✔
88
        SUM = 100.0
1✔
89
    */
1✔
90
    let mut inp = insert_field(ITALY, FIELD_100_FLOAT);
1✔
91
    let mut out = output!(processor, inp, tx);
1✔
92
    let mut exp = vec![insert_exp(ITALY, FIELD_100_FLOAT)];
1✔
93
    assert_eq!(out, exp);
1✔
94

95
    // Insert another 100 for segment Italy
96
    /*
97
        Italy, 100.0
98
        Italy, 100.0
99
        -------------
100
        SUM = 200.0
101
    */
×
102
    inp = insert_field(ITALY, FIELD_100_FLOAT);
1✔
103
    out = output!(processor, inp, tx);
×
104
    exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_200_FLOAT)];
1✔
105
    assert_eq!(out, exp);
1✔
106

107
    // Insert 50 for segment Singapore
108
    /*
109
        Italy, 100.0
110
        Italy, 100.0
111
        -------------
112
        SUM = 200.0
×
113

×
114
        Singapore, 50.0
×
115
        ---------------
×
116
        SUM = 50.0
117
    */
118
    inp = insert_field(SINGAPORE, FIELD_50_FLOAT);
1✔
119
    out = output!(processor, inp, tx);
1✔
120
    exp = vec![insert_exp(SINGAPORE, FIELD_50_FLOAT)];
1✔
121
    assert_eq!(out, exp);
1✔
122

×
123
    // Update Singapore segment to Italy
×
124
    /*
×
125
        Italy, 100.0
×
126
        Italy, 100.0
×
127
        Italy, 50.0
128
        -------------
×
129
        SUM = 250.0
×
130
    */
×
131
    inp = update_field(SINGAPORE, ITALY, FIELD_50_FLOAT, FIELD_50_FLOAT);
1✔
132
    out = output!(processor, inp, tx);
1✔
133
    exp = vec![
1✔
134
        delete_exp(SINGAPORE, FIELD_50_FLOAT),
1✔
135
        update_exp(ITALY, ITALY, FIELD_200_FLOAT, FIELD_250_FLOAT),
1✔
136
    ];
1✔
137
    assert_eq!(out, exp);
1✔
138

×
139
    // Update Italy value 100 -> 200
×
140
    /*
×
141
        Italy, 200.0
×
142
        Italy, 100.0
×
143
        Italy, 50.0
×
144
        -------------
×
145
        SUM = 350.0
×
146
    */
×
147
    inp = update_field(ITALY, ITALY, FIELD_100_FLOAT, FIELD_200_FLOAT);
1✔
148
    out = output!(processor, inp, tx);
1✔
149
    exp = vec![update_exp(ITALY, ITALY, FIELD_250_FLOAT, FIELD_350_FLOAT)];
1✔
150
    assert_eq!(out, exp);
1✔
151

152
    // Delete 1 record (200)
153
    /*
154
        Italy, 100.0
155
        Italy, 50.0
156
        -------------
157
        SUM = 150.0
×
158
    */
×
159
    inp = delete_field(ITALY, FIELD_200_FLOAT);
1✔
160
    out = output!(processor, inp, tx);
1✔
161
    exp = vec![update_exp(ITALY, ITALY, FIELD_350_FLOAT, FIELD_150_FLOAT)];
1✔
162
    assert_eq!(out, exp);
1✔
163

164
    // Delete another record (50)
165
    /*
166
        Italy, 100.0
167
        -------------
168
        SUM = 100.0
169
    */
170
    inp = delete_field(ITALY, FIELD_50_FLOAT);
1✔
171
    out = output!(processor, inp, tx);
1✔
172
    exp = vec![update_exp(ITALY, ITALY, FIELD_150_FLOAT, FIELD_100_FLOAT)];
1✔
173
    assert_eq!(out, exp);
1✔
174

×
175
    // Delete last record
×
176
    /*
×
177
        -------------
178
        SUM = 0.0
179
    */
180
    inp = delete_field(ITALY, FIELD_100_FLOAT);
1✔
181
    out = output!(processor, inp, tx);
1✔
182
    exp = vec![delete_exp(ITALY, FIELD_100_FLOAT)];
1✔
183
    assert_eq!(out, exp);
1✔
184
}
1✔
185

186
#[test]
1✔
187
fn test_sum_aggregation_int() {
1✔
188
    let schema = init_input_schema(Int, "SUM");
1✔
189
    let (processor, tx) = init_processor(
1✔
190
        "SELECT Country, SUM(Salary) \
1✔
191
        FROM Users \
1✔
192
        WHERE Salary >= 1 GROUP BY Country",
1✔
193
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
194
    )
1✔
195
    .unwrap();
1✔
196

1✔
197
    // Insert 100 for segment Italy
1✔
198
    /*
1✔
199
        Italy, 100.0
1✔
200
        -------------
1✔
201
        SUM = 100.0
1✔
202
    */
1✔
203
    let mut inp = insert_field(ITALY, FIELD_100_INT);
1✔
204
    let mut out = output!(processor, inp, tx);
1✔
205
    let mut exp = vec![insert_exp(ITALY, FIELD_100_INT)];
1✔
206
    assert_eq!(out, exp);
1✔
207

208
    // Insert another 100 for segment Italy
209
    /*
210
        Italy, 100.0
211
        Italy, 100.0
212
        -------------
213
        SUM = 200.0
214
    */
×
215
    inp = insert_field(ITALY, FIELD_100_INT);
1✔
216
    out = output!(processor, inp, tx);
×
217
    exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_200_INT)];
1✔
218
    assert_eq!(out, exp);
1✔
219

220
    // Insert 50 for segment Singapore
221
    /*
222
        Italy, 100.0
223
        Italy, 100.0
224
        -------------
225
        SUM = 200.0
×
226

×
227
        Singapore, 50.0
×
228
        ---------------
×
229
        SUM = 50.0
230
    */
231
    inp = insert_field(SINGAPORE, FIELD_50_INT);
1✔
232
    out = output!(processor, inp, tx);
1✔
233
    exp = vec![insert_exp(SINGAPORE, FIELD_50_INT)];
1✔
234
    assert_eq!(out, exp);
1✔
235

×
236
    // Update Singapore segment to Italy
×
237
    /*
×
238
        Italy, 100.0
×
239
        Italy, 100.0
×
240
        Italy, 50.0
241
        -------------
×
242
        SUM = 250.0
×
243
    */
×
244
    inp = update_field(SINGAPORE, ITALY, FIELD_50_INT, FIELD_50_INT);
1✔
245
    out = output!(processor, inp, tx);
1✔
246
    exp = vec![
1✔
247
        delete_exp(SINGAPORE, FIELD_50_INT),
1✔
248
        update_exp(ITALY, ITALY, FIELD_200_INT, FIELD_250_INT),
1✔
249
    ];
1✔
250
    assert_eq!(out, exp);
1✔
251

×
252
    // Update Italy value 100 -> 200
×
253
    /*
×
254
        Italy, 200.0
×
255
        Italy, 100.0
×
256
        Italy, 50.0
×
257
        -------------
×
258
        SUM = 350.0
×
259
    */
×
260
    inp = update_field(ITALY, ITALY, FIELD_100_INT, FIELD_200_INT);
1✔
261
    out = output!(processor, inp, tx);
1✔
262
    exp = vec![update_exp(ITALY, ITALY, FIELD_250_INT, FIELD_350_INT)];
1✔
263
    assert_eq!(out, exp);
1✔
264

265
    // Delete 1 record (200)
266
    /*
267
        Italy, 100.0
268
        Italy, 50.0
269
        -------------
270
        SUM = 150.0
×
271
    */
×
272
    inp = delete_field(ITALY, FIELD_200_INT);
1✔
273
    out = output!(processor, inp, tx);
1✔
274
    exp = vec![update_exp(ITALY, ITALY, FIELD_350_INT, FIELD_150_INT)];
1✔
275
    assert_eq!(out, exp);
1✔
276

277
    // Delete another record (50)
278
    /*
279
        Italy, 100.0
280
        -------------
281
        SUM = 100.0
282
    */
283
    inp = delete_field(ITALY, FIELD_50_INT);
1✔
284
    out = output!(processor, inp, tx);
1✔
285
    exp = vec![update_exp(ITALY, ITALY, FIELD_150_INT, FIELD_100_INT)];
1✔
286
    assert_eq!(out, exp);
1✔
287

×
288
    // Delete last record
×
289
    /*
×
290
        -------------
291
        SUM = 0.0
292
    */
293
    inp = delete_field(ITALY, FIELD_100_INT);
1✔
294
    out = output!(processor, inp, tx);
1✔
295
    exp = vec![delete_exp(ITALY, FIELD_100_INT)];
1✔
296
    assert_eq!(out, exp);
1✔
297
}
1✔
298

299
#[test]
1✔
300
fn test_sum_aggregation_uint() {
1✔
301
    let schema = init_input_schema(UInt, "SUM");
1✔
302
    let (processor, tx) = init_processor(
1✔
303
        "SELECT Country, SUM(Salary) \
1✔
304
        FROM Users \
1✔
305
        WHERE Salary >= 1 GROUP BY Country",
1✔
306
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
307
    )
1✔
308
    .unwrap();
1✔
309

1✔
310
    // Insert 100 for segment Italy
1✔
311
    /*
1✔
312
        Italy, 100.0
1✔
313
        -------------
1✔
314
        SUM = 100.0
1✔
315
    */
1✔
316
    let mut inp = insert_field(ITALY, FIELD_100_UINT);
1✔
317
    let mut out = output!(processor, inp, tx);
1✔
318
    let mut exp = vec![insert_exp(ITALY, FIELD_100_UINT)];
1✔
319
    assert_eq!(out, exp);
1✔
320

321
    // Insert another 100 for segment Italy
322
    /*
323
        Italy, 100.0
324
        Italy, 100.0
325
        -------------
326
        SUM = 200.0
327
    */
×
328
    inp = insert_field(ITALY, FIELD_100_UINT);
1✔
329
    out = output!(processor, inp, tx);
×
330
    exp = vec![update_exp(ITALY, ITALY, FIELD_100_UINT, FIELD_200_UINT)];
1✔
331
    assert_eq!(out, exp);
1✔
332

333
    // Insert 50 for segment Singapore
334
    /*
335
        Italy, 100.0
336
        Italy, 100.0
337
        -------------
338
        SUM = 200.0
×
339

×
340
        Singapore, 50.0
×
341
        ---------------
×
342
        SUM = 50.0
343
    */
344
    inp = insert_field(SINGAPORE, FIELD_50_UINT);
1✔
345
    out = output!(processor, inp, tx);
1✔
346
    exp = vec![insert_exp(SINGAPORE, FIELD_50_UINT)];
1✔
347
    assert_eq!(out, exp);
1✔
348

×
349
    // Update Singapore segment to Italy
×
350
    /*
×
351
        Italy, 100.0
×
352
        Italy, 100.0
×
353
        Italy, 50.0
354
        -------------
×
355
        SUM = 250.0
×
356
    */
×
357
    inp = update_field(SINGAPORE, ITALY, FIELD_50_UINT, FIELD_50_UINT);
1✔
358
    out = output!(processor, inp, tx);
1✔
359
    exp = vec![
1✔
360
        delete_exp(SINGAPORE, FIELD_50_UINT),
1✔
361
        update_exp(ITALY, ITALY, FIELD_200_UINT, FIELD_250_UINT),
1✔
362
    ];
1✔
363
    assert_eq!(out, exp);
1✔
364

×
365
    // Update Italy value 100 -> 200
×
366
    /*
×
367
        Italy, 200.0
×
368
        Italy, 100.0
×
369
        Italy, 50.0
×
370
        -------------
×
371
        SUM = 350.0
×
372
    */
×
373
    inp = update_field(ITALY, ITALY, FIELD_100_UINT, FIELD_200_UINT);
1✔
374
    out = output!(processor, inp, tx);
1✔
375
    exp = vec![update_exp(ITALY, ITALY, FIELD_250_UINT, FIELD_350_UINT)];
1✔
376
    assert_eq!(out, exp);
1✔
377

378
    // Delete 1 record (200)
379
    /*
380
        Italy, 100.0
381
        Italy, 50.0
382
        -------------
383
        SUM = 150.0
×
384
    */
×
385
    inp = delete_field(ITALY, FIELD_200_UINT);
1✔
386
    out = output!(processor, inp, tx);
1✔
387
    exp = vec![update_exp(ITALY, ITALY, FIELD_350_UINT, FIELD_150_UINT)];
1✔
388
    assert_eq!(out, exp);
1✔
389

×
390
    // Delete another record (50)
×
391
    /*
×
392
        Italy, 100.0
393
        -------------
394
        SUM = 100.0
395
    */
396
    inp = delete_field(ITALY, FIELD_50_UINT);
1✔
397
    out = output!(processor, inp, tx);
1✔
398
    exp = vec![update_exp(ITALY, ITALY, FIELD_150_UINT, FIELD_100_UINT)];
1✔
399
    assert_eq!(out, exp);
1✔
400

401
    // Delete last record
402
    /*
403
        -------------
404
        SUM = 0.0
×
405
    */
×
406
    inp = delete_field(ITALY, FIELD_100_UINT);
1✔
407
    out = output!(processor, inp, tx);
1✔
408
    exp = vec![delete_exp(ITALY, FIELD_100_UINT)];
1✔
409
    assert_eq!(out, exp);
1✔
410
}
1✔
411

412
#[test]
1✔
413
fn test_sum_aggregation_decimal() {
1✔
414
    let schema = init_input_schema(Decimal, "SUM");
1✔
415
    let (processor, tx) = init_processor(
1✔
416
        "SELECT Country, SUM(Salary) \
1✔
417
        FROM Users \
1✔
418
        WHERE Salary >= 1 GROUP BY Country",
1✔
419
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
420
    )
1✔
421
    .unwrap();
1✔
422

1✔
423
    // Insert 100 for segment Italy
1✔
424
    /*
1✔
425
        Italy, 100.0
1✔
426
        -------------
1✔
427
        SUM = 100.0
1✔
428
    */
1✔
429
    let mut inp = insert_field(ITALY, &get_decimal_field(100));
1✔
430
    let mut out = output!(processor, inp, tx);
1✔
431
    let mut exp = vec![insert_exp(ITALY, &get_decimal_field(100))];
1✔
432
    assert_eq!(out, exp);
1✔
433

×
434
    // Insert another 100 for segment Italy
435
    /*
436
        Italy, 100.0
437
        Italy, 100.0
438
        -------------
439
        SUM = 200.0
440
    */
441
    inp = insert_field(ITALY, &get_decimal_field(100));
1✔
442
    out = output!(processor, inp, tx);
1✔
443
    exp = vec![update_exp(
1✔
444
        ITALY,
1✔
445
        ITALY,
1✔
446
        &get_decimal_field(100),
1✔
447
        &get_decimal_field(200),
1✔
448
    )];
1✔
449
    assert_eq!(out, exp);
1✔
450

×
451
    // Insert 50 for segment Singapore
×
452
    /*
×
453
        Italy, 100.0
×
454
        Italy, 100.0
×
455
        -------------
×
456
        SUM = 200.0
×
457

458
        Singapore, 50.0
459
        ---------------
460
        SUM = 50.0
461
    */
462
    inp = insert_field(SINGAPORE, &get_decimal_field(50));
1✔
463
    out = output!(processor, inp, tx);
1✔
464
    exp = vec![insert_exp(SINGAPORE, &get_decimal_field(50))];
1✔
465
    assert_eq!(out, exp);
1✔
466

×
467
    // Update Singapore segment to Italy
×
468
    /*
×
469
        Italy, 100.0
×
470
        Italy, 100.0
×
471
        Italy, 50.0
×
472
        -------------
×
473
        SUM = 250.0
×
474
    */
475
    inp = update_field(
1✔
476
        SINGAPORE,
1✔
477
        ITALY,
1✔
478
        &get_decimal_field(50),
1✔
479
        &get_decimal_field(50),
1✔
480
    );
1✔
481
    out = output!(processor, inp, tx);
1✔
482
    exp = vec![
1✔
483
        delete_exp(SINGAPORE, &get_decimal_field(50)),
1✔
484
        update_exp(
1✔
485
            ITALY,
1✔
486
            ITALY,
1✔
487
            &get_decimal_field(200),
1✔
488
            &get_decimal_field(250),
1✔
489
        ),
1✔
490
    ];
1✔
491
    assert_eq!(out, exp);
1✔
492

493
    // Update Italy value 100 -> 200
494
    /*
495
        Italy, 200.0
496
        Italy, 100.0
×
497
        Italy, 50.0
×
498
        -------------
×
499
        SUM = 350.0
×
500
    */
×
501
    inp = update_field(
1✔
502
        ITALY,
1✔
503
        ITALY,
1✔
504
        &get_decimal_field(100),
1✔
505
        &get_decimal_field(200),
1✔
506
    );
1✔
507
    out = output!(processor, inp, tx);
1✔
508
    exp = vec![update_exp(
1✔
509
        ITALY,
1✔
510
        ITALY,
1✔
511
        &get_decimal_field(250),
1✔
512
        &get_decimal_field(350),
1✔
513
    )];
1✔
514
    assert_eq!(out, exp);
1✔
515

×
516
    // Delete 1 record (200)
×
517
    /*
×
518
        Italy, 100.0
×
519
        Italy, 50.0
×
520
        -------------
×
521
        SUM = 150.0
×
522
    */
×
523
    inp = delete_field(ITALY, &get_decimal_field(200));
1✔
524
    out = output!(processor, inp, tx);
1✔
525
    exp = vec![update_exp(
1✔
526
        ITALY,
1✔
527
        ITALY,
1✔
528
        &get_decimal_field(350),
1✔
529
        &get_decimal_field(150),
1✔
530
    )];
1✔
531
    assert_eq!(out, exp);
1✔
532

×
533
    // Delete another record (50)
×
534
    /*
×
535
        Italy, 100.0
536
        -------------
537
        SUM = 100.0
538
    */
539
    inp = delete_field(ITALY, &get_decimal_field(50));
1✔
540
    out = output!(processor, inp, tx);
1✔
541
    exp = vec![update_exp(
1✔
542
        ITALY,
1✔
543
        ITALY,
1✔
544
        &get_decimal_field(150),
1✔
545
        &get_decimal_field(100),
1✔
546
    )];
1✔
547
    assert_eq!(out, exp);
1✔
548

549
    // Delete last record
550
    /*
551
        -------------
552
        SUM = 0.0
553
    */
554
    inp = delete_field(ITALY, &get_decimal_field(100));
1✔
555
    out = output!(processor, inp, tx);
1✔
556
    exp = vec![delete_exp(ITALY, &get_decimal_field(100))];
1✔
557
    assert_eq!(out, exp);
1✔
558
}
1✔
559

560
#[test]
1✔
561
fn test_sum_aggregation_int_null() {
1✔
562
    let schema = init_input_schema(Int, "SUM");
1✔
563
    let (processor, tx) = init_processor(
1✔
564
        "SELECT Country, SUM(Salary) \
1✔
565
        FROM Users \
1✔
566
        WHERE Salary >= 1 GROUP BY Country",
1✔
567
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
568
    )
1✔
569
    .unwrap();
1✔
570

1✔
571
    // Insert NULL for segment Italy
1✔
572
    /*
1✔
573
        Italy, NULL
1✔
574
        -------------
1✔
575
        SUM = 0
1✔
576
    */
1✔
577
    let mut inp = insert_field(ITALY, FIELD_NULL);
1✔
578
    let mut out = output!(processor, inp, tx);
1✔
579
    let mut exp = vec![insert_exp(ITALY, FIELD_0_INT)];
1✔
580
    assert_eq!(out, exp);
1✔
581

×
582
    // Insert 100 for segment Italy
×
583
    /*
×
584
        Italy, NULL
×
585
        Italy, 100
×
586
        -------------
×
587
        SUM = 100
×
588
    */
×
589
    inp = insert_field(ITALY, FIELD_100_INT);
1✔
590
    out = output!(processor, inp, tx);
1✔
591
    exp = vec![update_exp(ITALY, ITALY, FIELD_0_INT, FIELD_100_INT)];
1✔
592
    assert_eq!(out, exp);
1✔
593

594
    // Update 100 for segment Italy to NULL
595
    /*
596
        Italy, NULL
597
        Italy, NULL
598
        -------------
599
        SUM = 0
×
600
    */
×
601
    inp = update_field(ITALY, ITALY, FIELD_100_INT, FIELD_NULL);
1✔
602
    out = output!(processor, inp, tx);
1✔
603
    exp = vec![update_exp(ITALY, ITALY, FIELD_100_INT, FIELD_0_INT)];
1✔
604
    assert_eq!(out, exp);
1✔
605

606
    // Delete a record
607
    /*
608
        Italy, NULL
609
        -------------
610
        SUM = 0
611
    */
×
612
    inp = delete_field(ITALY, FIELD_NULL);
1✔
613
    out = output!(processor, inp, tx);
1✔
614
    exp = vec![update_exp(ITALY, ITALY, FIELD_0_INT, FIELD_0_INT)];
1✔
615
    assert_eq!(out, exp);
1✔
616

617
    // Delete last record
618
    /*
619
        -------------
620
        SUM = 0.0
621
    */
622
    inp = delete_field(ITALY, FIELD_NULL);
1✔
623
    out = output!(processor, inp, tx);
1✔
624
    exp = vec![delete_exp(ITALY, FIELD_0_INT)];
1✔
625
    assert_eq!(out, exp);
1✔
626
}
1✔
627

628
#[test]
1✔
629
fn test_sum_aggregation_float_null() {
1✔
630
    let schema = init_input_schema(Float, "SUM");
1✔
631
    let (processor, tx) = init_processor(
1✔
632
        "SELECT Country, SUM(Salary) \
1✔
633
        FROM Users \
1✔
634
        WHERE Salary >= 1 GROUP BY Country",
1✔
635
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
636
    )
1✔
637
    .unwrap();
1✔
638

1✔
639
    // Insert NULL for segment Italy
1✔
640
    /*
1✔
641
        Italy, NULL
1✔
642
        -------------
1✔
643
        SUM = 0
1✔
644
    */
1✔
645
    let mut inp = insert_field(ITALY, FIELD_NULL);
1✔
646
    let mut out = output!(processor, inp, tx);
1✔
647
    let mut exp = vec![insert_exp(ITALY, FIELD_0_FLOAT)];
1✔
648
    assert_eq!(out, exp);
1✔
649

×
650
    // Insert 100 for segment Italy
×
651
    /*
×
652
        Italy, NULL
×
653
        Italy, 100
×
654
        -------------
×
655
        SUM = 100
×
656
    */
×
657
    inp = insert_field(ITALY, FIELD_100_FLOAT);
1✔
658
    out = output!(processor, inp, tx);
1✔
659
    exp = vec![update_exp(ITALY, ITALY, FIELD_0_FLOAT, FIELD_100_FLOAT)];
1✔
660
    assert_eq!(out, exp);
1✔
661

662
    // Update 100 for segment Italy to NULL
663
    /*
664
        Italy, NULL
665
        Italy, NULL
666
        -------------
667
        SUM = 0
×
668
    */
×
669
    inp = update_field(ITALY, ITALY, FIELD_100_FLOAT, FIELD_NULL);
1✔
670
    out = output!(processor, inp, tx);
1✔
671
    exp = vec![update_exp(ITALY, ITALY, FIELD_100_FLOAT, FIELD_0_FLOAT)];
1✔
672
    assert_eq!(out, exp);
1✔
673

×
674
    // Delete a record
×
675
    /*
×
676
        Italy, NULL
677
        -------------
678
        SUM = 0
679
    */
680
    inp = delete_field(ITALY, FIELD_NULL);
1✔
681
    out = output!(processor, inp, tx);
1✔
682
    exp = vec![update_exp(ITALY, ITALY, FIELD_0_FLOAT, FIELD_0_FLOAT)];
1✔
683
    assert_eq!(out, exp);
1✔
684

×
685
    // Delete last record
×
686
    /*
×
687
        -------------
×
688
        SUM = 0.0
×
689
    */
×
690
    inp = delete_field(ITALY, FIELD_NULL);
1✔
691
    out = output!(processor, inp, tx);
1✔
692
    exp = vec![delete_exp(ITALY, FIELD_0_FLOAT)];
1✔
693
    assert_eq!(out, exp);
1✔
694
}
1✔
695

696
#[test]
1✔
697
fn test_sum_aggregation_decimal_null() {
1✔
698
    let schema = init_input_schema(Decimal, "SUM");
1✔
699
    let (processor, tx) = init_processor(
1✔
700
        "SELECT Country, SUM(Salary) \
1✔
701
        FROM Users \
1✔
702
        WHERE Salary >= 1 GROUP BY Country",
1✔
703
        HashMap::from([(DEFAULT_PORT_HANDLE, schema)]),
1✔
704
    )
1✔
705
    .unwrap();
1✔
706

1✔
707
    // Insert NULL for segment Italy
1✔
708
    /*
1✔
709
        Italy, NULL
1✔
710
        -------------
1✔
711
        SUM = 0
1✔
712
    */
1✔
713
    let mut inp = insert_field(ITALY, FIELD_NULL);
1✔
714
    let mut out = output!(processor, inp, tx);
1✔
715
    let mut exp = vec![insert_exp(ITALY, &get_decimal_field(0))];
1✔
716
    assert_eq!(out, exp);
1✔
717

×
718
    // Insert 100 for segment Italy
×
719
    /*
×
720
        Italy, NULL
721
        Italy, 100
722
        -------------
723
        SUM = 100
724
    */
725
    inp = insert_field(ITALY, &get_decimal_field(100));
1✔
726
    out = output!(processor, inp, tx);
1✔
727
    exp = vec![update_exp(
1✔
728
        ITALY,
1✔
729
        ITALY,
1✔
730
        &get_decimal_field(0),
1✔
731
        &get_decimal_field(100),
1✔
732
    )];
1✔
733
    assert_eq!(out, exp);
1✔
734

735
    // Update 100 for segment Italy to NULL
736
    /*
737
        Italy, NULL
738
        Italy, NULL
739
        -------------
740
        SUM = 0
741
    */
742
    inp = update_field(ITALY, ITALY, &get_decimal_field(100), FIELD_NULL);
1✔
743
    out = output!(processor, inp, tx);
×
744
    exp = vec![update_exp(
1✔
745
        ITALY,
1✔
746
        ITALY,
1✔
747
        &get_decimal_field(100),
1✔
748
        &get_decimal_field(0),
1✔
749
    )];
1✔
750
    assert_eq!(out, exp);
1✔
751

752
    // Delete a record
753
    /*
754
        Italy, NULL
755
        -------------
756
        SUM = 0
757
    */
758
    inp = delete_field(ITALY, FIELD_NULL);
1✔
759
    out = output!(processor, inp, tx);
1✔
760
    exp = vec![update_exp(
1✔
761
        ITALY,
1✔
762
        ITALY,
1✔
763
        &get_decimal_field(0),
1✔
764
        &get_decimal_field(0),
1✔
765
    )];
1✔
766
    assert_eq!(out, exp);
1✔
767

768
    // Delete last record
769
    /*
770
        -------------
771
        SUM = 0.0
772
    */
773
    inp = delete_field(ITALY, FIELD_NULL);
1✔
774
    out = output!(processor, inp, tx);
1✔
775
    exp = vec![delete_exp(ITALY, &get_decimal_field(0))];
1✔
776
    assert_eq!(out, exp);
1✔
777
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc