• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/source/libs/function/src/detail/tavgfunction.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "builtinsimpl.h"
17
#include "function.h"
18
#include "tdatablock.h"
19
#include "tfunctionInt.h"
20
#include "tglobal.h"
21

22
#define SET_VAL(_info, numOfElem, res) \
23
  do {                                 \
24
    if ((numOfElem) <= 0) {            \
25
      break;                           \
26
    }                                  \
27
    (_info)->numOfRes = (res);         \
28
  } while (0)
29

30
#define LIST_AVG_N(sumT, T)                                               \
31
  do {                                                                    \
32
    T* plist = (T*)pCol->pData;                                           \
33
    for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { \
34
      if (colDataIsNull_f(pCol->nullbitmap, i)) {                         \
35
        continue;                                                         \
36
      }                                                                   \
37
                                                                          \
38
      numOfElem += 1;                                                     \
39
      pAvgRes->count -= 1;                                                \
40
      sumT -= plist[i];                                                   \
41
    }                                                                     \
42
  } while (0)
43

44
// define signed number sum with check overflow
45
#define CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, val)                                    \
46
  do {                                                                             \
47
    SAvgRes* out = pAvgRes;                                                        \
48
    if (out->sum.overflow) {                                                       \
49
      out->sum.dsum += val;                                                        \
50
    } else if (out->sum.isum > 0 && val > 0 && INT64_MAX - out->sum.isum <= val || \
51
               out->sum.isum < 0 && val < 0 && INT64_MIN - out->sum.isum >= val) { \
52
      double dsum = (double)out->sum.isum;                                         \
53
      out->sum.overflow = true;                                                    \
54
      out->sum.dsum = dsum + val;                                                  \
55
    } else {                                                                       \
56
      out->sum.isum += val;                                                        \
57
    }                                                                              \
58
  } while (0)
59

60
// val is big than INT64_MAX, val come from merge
61
#define CHECK_OVERFLOW_SUM_SIGNED_BIG(pAvgRes, val, big)                                  \
62
  do {                                                                                    \
63
    SAvgRes* out = pAvgRes;                                                               \
64
    if (out->sum.overflow) {                                                              \
65
      out->sum.dsum += val;                                                               \
66
    } else if (out->sum.isum > 0 && val > 0 && INT64_MAX - out->sum.isum <= val ||        \
67
               out->sum.isum < 0 && val < 0 && INT64_MIN - out->sum.isum >= val || big) { \
68
      double dsum = (double)out->sum.isum;                                                \
69
      out->sum.overflow = true;                                                           \
70
      out->sum.dsum = dsum + val;                                                         \
71
    } else {                                                                              \
72
      SUM_RES_INC_ISUM(&AVG_RES_GET_SUM(out), val);                                       \
73
    }                                                                                     \
74
  } while (0)
75

76
// define unsigned number sum with check overflow
77
#define CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, val)   \
78
  do {                                              \
79
    SAvgRes* out = pAvgRes;                         \
80
    if (out->sum.overflow) {                        \
81
      out->sum.dsum += val;                         \
82
    } else if (UINT64_MAX - out->sum.usum <= val) { \
83
      double dsum = (double)out->sum.usum;          \
84
      out->sum.overflow = true;                     \
85
      out->sum.dsum = dsum + val;                   \
86
    } else {                                        \
87
      out->sum.usum += val;                         \
88
    }                                               \
89
  } while (0)
90

91
// val is big than UINT64_MAX, val come from merge
92
#define CHECK_OVERFLOW_SUM_UNSIGNED_BIG(pAvgRes, val, big) \
93
  do {                                                     \
94
    SAvgRes* out = pAvgRes;                                \
95
    if (out->sum.overflow) {                               \
96
      out->sum.dsum += val;                                \
97
    } else if (UINT64_MAX - out->sum.usum <= val || big) { \
98
      double dsum = (double)out->sum.usum;                 \
99
      out->sum.overflow = true;                            \
100
      out->sum.dsum = dsum + val;                          \
101
    } else {                                               \
102
      out->sum.usum += val;                                \
103
    }                                                      \
104
  } while (0)
105

106
int32_t getAvgInfoSize(SFunctionNode* pFunc) {
240,038✔
107
  if (pFunc->pSrcFuncRef) return AVG_RES_GET_SIZE(pFunc->pSrcFuncRef->srcFuncInputType.type);
240,038!
108
  return AVG_RES_GET_SIZE(pFunc->srcFuncInputType.type);
177,486!
109
}
110

111
bool getAvgFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
176,945✔
112
  pEnv->calcMemSize =getAvgInfoSize(pFunc);
176,945✔
113
  return true;
176,974✔
114
}
115

116
int32_t avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
29,850,505✔
117
  if (pResultInfo->initialized) {
29,850,505!
UNCOV
118
    return TSDB_CODE_SUCCESS;
×
119
  }
120
  if (TSDB_CODE_SUCCESS != functionSetup(pCtx, pResultInfo)) {
29,850,505!
121
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
122
  }
123

124
  void* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
30,060,863✔
125
  (void)memset(pRes, 0, pCtx->resDataInfo.interBufSize);
30,060,863✔
126
  return TSDB_CODE_SUCCESS;
30,060,863✔
127
}
128

129
static int32_t calculateAvgBySMAInfo(void* pRes, int32_t numOfRows, int32_t type, const SColumnDataAgg* pAgg, int32_t* pNumOfElem) {
201✔
130
  int32_t numOfElem = numOfRows - pAgg->numOfNull;
201✔
131

132
  AVG_RES_INC_COUNT(pRes, type, numOfElem);
201✔
133
  if (IS_SIGNED_NUMERIC_TYPE(type)) {
201!
134
    CHECK_OVERFLOW_SUM_SIGNED(pRes, pAgg->sum);
185!
135
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
16!
UNCOV
136
    CHECK_OVERFLOW_SUM_UNSIGNED(pRes, pAgg->sum);
×
137
  } else if (IS_FLOAT_TYPE(type)) {
16!
UNCOV
138
    SUM_RES_INC_DSUM(&AVG_RES_GET_SUM(pRes), GET_DOUBLE_VAL((const char*)&(pAgg->sum)));
×
139
  } else if (IS_DECIMAL_TYPE(type)) {
16!
140
    bool overflow = pAgg->overflow;
16✔
141
    if (overflow) return TSDB_CODE_DECIMAL_OVERFLOW;
16✔
142
    SUM_RES_INC_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pRes), &pAgg->decimal128Sum, TSDB_DATA_TYPE_DECIMAL);
12!
143
    if (overflow) return TSDB_CODE_DECIMAL_OVERFLOW;
12!
144
  }
145

146
  *pNumOfElem = numOfElem;
197✔
147
  return 0;
197✔
148
}
149

150
static int32_t doAddNumericVector(SColumnInfoData* pCol, int32_t type, SInputColumnInfoData *pInput, void* pRes, int32_t* pNumOfElem) {
19,334,551✔
151
  int32_t start = pInput->startRowIndex;
19,334,551✔
152
  int32_t numOfRows = pInput->numOfRows;
19,334,551✔
153
  int32_t numOfElems = 0;
19,334,551✔
154

155
  switch (type) {
19,334,551!
156
    case TSDB_DATA_TYPE_TINYINT: {
9,901✔
157
      int8_t* plist = (int8_t*)pCol->pData;
9,901✔
158
      for (int32_t i = start; i < numOfRows + start; ++i) {
205,243✔
159
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
195,342✔
160
          continue;
92,954✔
161
        }
162

163
        numOfElems += 1;
102,388✔
164
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_TINYINT, 1);
102,388✔
165
        CHECK_OVERFLOW_SUM_SIGNED(pRes, plist[i]);
102,388!
166
      }
167

168
      break;
9,901✔
169
    }
170

171
    case TSDB_DATA_TYPE_SMALLINT: {
5,490✔
172
      int16_t* plist = (int16_t*)pCol->pData;
5,490✔
173
      for (int32_t i = start; i < numOfRows + start; ++i) {
136,070✔
174
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
130,580✔
175
          continue;
118,415✔
176
        }
177

178
        numOfElems += 1;
12,165✔
179
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_SMALLINT, 1);
12,165✔
180
        CHECK_OVERFLOW_SUM_SIGNED(pRes, plist[i]);
12,165!
181
      }
182
      break;
5,490✔
183
    }
184

185
    case TSDB_DATA_TYPE_INT: {
77,516✔
186
      int32_t* plist = (int32_t*)pCol->pData;
77,516✔
187
      for (int32_t i = start; i < numOfRows + start; ++i) {
16,655,390✔
188
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
16,577,874✔
189
          continue;
97,091✔
190
        }
191

192
        numOfElems += 1;
16,480,783✔
193
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_INT, 1);
16,480,783✔
194
        CHECK_OVERFLOW_SUM_SIGNED(pRes, plist[i]);
16,480,783!
195
      }
196

197
      break;
77,516✔
198
    }
199

200
    case TSDB_DATA_TYPE_BIGINT: {
17,835,640✔
201
      int64_t* plist = (int64_t*)pCol->pData;
17,835,640✔
202
      for (int32_t i = start; i < numOfRows + start; ++i) {
91,048,953✔
203
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
73,213,313✔
204
          continue;
62,648,967✔
205
        }
206

207
        numOfElems += 1;
10,564,346✔
208
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_BIGINT, 1);
10,564,346✔
209
        CHECK_OVERFLOW_SUM_SIGNED(pRes, plist[i]);
10,564,346✔
210
      }
211
      break;
17,835,640✔
212
    }
213

214
    case TSDB_DATA_TYPE_UTINYINT: {
1✔
215
      uint8_t* plist = (uint8_t*)pCol->pData;
1✔
216
      for (int32_t i = start; i < numOfRows + start; ++i) {
11✔
217
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
10!
UNCOV
218
          continue;
×
219
        }
220

221
        numOfElems += 1;
10✔
222
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_UTINYINT, 1);
10✔
223
        CHECK_OVERFLOW_SUM_UNSIGNED(pRes, plist[i]);
10!
224
      }
225

226
      break;
1✔
227
    }
228

229
    case TSDB_DATA_TYPE_USMALLINT: {
1✔
230
      uint16_t* plist = (uint16_t*)pCol->pData;
1✔
231
      for (int32_t i = start; i < numOfRows + start; ++i) {
11✔
232
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
10!
UNCOV
233
          continue;
×
234
        }
235

236
        numOfElems += 1;
10✔
237
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_USMALLINT, 1);
10✔
238
        CHECK_OVERFLOW_SUM_UNSIGNED(pRes, plist[i]);
10!
239
      }
240
      break;
1✔
241
    }
242

243
    case TSDB_DATA_TYPE_UINT: {
118,824✔
244
      uint32_t* plist = (uint32_t*)pCol->pData;
118,824✔
245
      for (int32_t i = start; i < numOfRows + start; ++i) {
272,390✔
246
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
153,566✔
247
          continue;
20,981✔
248
        }
249

250
        numOfElems += 1;
132,585✔
251
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_UINT, 1);
132,585✔
252
        CHECK_OVERFLOW_SUM_UNSIGNED(pRes, plist[i]);
132,585!
253
      }
254

255
      break;
118,824✔
256
    }
257

258
    case TSDB_DATA_TYPE_UBIGINT: {
16✔
259
      uint64_t* plist = (uint64_t*)pCol->pData;
16✔
260
      for (int32_t i = start; i < numOfRows + start; ++i) {
63✔
261
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
47✔
262
          continue;
4✔
263
        }
264

265
        numOfElems += 1;
43✔
266
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_UBIGINT, 1);
43✔
267
        CHECK_OVERFLOW_SUM_UNSIGNED(pRes, plist[i]);
43✔
268
        
269
      }
270
      break;
16✔
271
    }
272

273
    case TSDB_DATA_TYPE_FLOAT: {
1,080,552✔
274
      float* plist = (float*)pCol->pData;
1,080,552✔
275
      for (int32_t i = start; i < numOfRows + start; ++i) {
3,336,827✔
276
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
2,256,275✔
277
          continue;
28,714✔
278
        }
279

280
        numOfElems += 1;
2,227,561✔
281
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_FLOAT, 1);
2,227,561✔
282
        SUM_RES_INC_DSUM(&AVG_RES_GET_SUM(pRes), plist[i]);
2,227,561✔
283
      }
284
      break;
1,080,552✔
285
    }
286

287
    case TSDB_DATA_TYPE_DOUBLE: {
230,792✔
288
      double* plist = (double*)pCol->pData;
230,792✔
289
      for (int32_t i = start; i < numOfRows + start; ++i) {
5,702,691✔
290
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
5,471,899✔
291
          continue;
664,077✔
292
        }
293

294
        numOfElems += 1;
4,807,822✔
295
        AVG_RES_INC_COUNT(pRes, TSDB_DATA_TYPE_DOUBLE, 1);
4,807,822✔
296
        SUM_RES_INC_DSUM(&AVG_RES_GET_SUM(pRes), plist[i]);
4,807,822✔
297
      }
298
      break;
230,792✔
299
    }
300
    case TSDB_DATA_TYPE_DECIMAL64:
2,368✔
301
    case TSDB_DATA_TYPE_DECIMAL: {
302
      const char* pDec = pCol->pData;
2,368✔
303
      for (int32_t i = start; i < numOfRows + start; ++i) {
82,376✔
304
        if (colDataIsNull_f(pCol->nullbitmap, i)) {
80,008✔
305
          continue;
1,289✔
306
        }
307

308
        numOfElems += 1;
78,719✔
309
        AVG_RES_INC_COUNT(pRes, type, 1);
78,719!
310
        bool overflow = false;
78,719✔
311
        SUM_RES_INC_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pRes), (const void*)(pDec + i * tDataTypes[type].bytes), type);
78,719!
312
        if (overflow) return TSDB_CODE_DECIMAL_OVERFLOW;
78,719!
313
      }
314
    } break;
2,368✔
UNCOV
315
    default:
×
UNCOV
316
      break;
×
317
  }
318

319
  *pNumOfElem = numOfElems;
19,334,551✔
320
  return 0;
19,334,551✔
321
}
322

323
int32_t avgFunction(SqlFunctionCtx* pCtx) {
41,546,953✔
324
  int32_t       numOfElem = 0;
41,546,953✔
325
  const int32_t THRESHOLD_SIZE = 8;
41,546,953✔
326

327
  SInputColumnInfoData* pInput = &pCtx->input;
41,546,953✔
328
  SColumnDataAgg*       pAgg = pInput->pColumnDataAgg[0];
41,546,953✔
329
  int32_t               type = pInput->pData[0]->info.type;
41,546,953✔
330
  pCtx->inputType = type;
41,546,953✔
331

332
  void* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
41,546,953✔
333

334
  // computing based on the true data block
335
  SColumnInfoData* pCol = pInput->pData[0];
41,546,953✔
336

337
  int32_t start = pInput->startRowIndex;
41,546,953✔
338
  int32_t numOfRows = pInput->numOfRows;
41,546,953✔
339

340
  if (IS_NULL_TYPE(type)) {
41,546,953✔
341
    goto _over;
252✔
342
  }
343

344
  AVG_RES_SET_TYPE(pAvgRes, pCtx->inputType, type);
41,546,701!
345
  if (IS_DECIMAL_TYPE(type)) AVG_RES_SET_INPUT_SCALE(pAvgRes, pInput->pData[0]->info.scale);
41,546,701!
346

347
  if (pInput->colDataSMAIsSet) {  // try to use SMA if available
41,546,701✔
348
    int32_t code = calculateAvgBySMAInfo(pAvgRes, numOfRows, type, pAgg, &numOfElem);
201✔
349
    if (code != 0) return code;
201✔
350
  } else if (!pCol->hasNull) {  // try to employ the simd instructions to speed up the loop
41,546,500✔
351
    numOfElem = pInput->numOfRows;
23,223,770✔
352
    AVG_RES_INC_COUNT(pAvgRes, pCtx->inputType, pInput->numOfRows);
23,223,770!
353

354
    switch(type) {
23,223,770!
355
      case TSDB_DATA_TYPE_UTINYINT:
4,075✔
356
      case TSDB_DATA_TYPE_TINYINT: {
357
        const int8_t* plist = (const int8_t*) pCol->pData;
4,075✔
358

359
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
157,430✔
360
          if (type == TSDB_DATA_TYPE_TINYINT) {
153,355✔
361
            CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]);
103,335!
362
          } else {
363
            CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint8_t)plist[i]);
50,020!
364
          }
365
        }
366
        break;
4,075✔
367
      }
368

369
      case TSDB_DATA_TYPE_USMALLINT:
8,989✔
370
      case TSDB_DATA_TYPE_SMALLINT: {
371
        const int16_t* plist = (const int16_t*)pCol->pData;
8,989✔
372

373
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
266,107✔
374
          if (type == TSDB_DATA_TYPE_SMALLINT) {
257,118✔
375
            CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]);
207,098!
376
          } else {
377
            CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint16_t)plist[i]);
50,020!
378
          }
379
        }
380
        break;
8,989✔
381
      }
382

383
      case TSDB_DATA_TYPE_UINT:
5,684,931✔
384
      case TSDB_DATA_TYPE_INT: {
385
        const int32_t* plist = (const int32_t*) pCol->pData;
5,684,931✔
386

387
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
89,811,043✔
388
          if (type == TSDB_DATA_TYPE_INT) {
84,126,112✔
389
            CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]);
83,809,512!
390
          } else {
391
            CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint32_t)plist[i]);
316,600!
392
          }
393
        }
394
        break;
5,684,931✔
395
      }
396

397
      case TSDB_DATA_TYPE_UBIGINT:
15,652,168✔
398
      case TSDB_DATA_TYPE_BIGINT: {
399
        const int64_t* plist = (const int64_t*) pCol->pData;
15,652,168✔
400

401
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
75,686,236✔
402
          if (type == TSDB_DATA_TYPE_BIGINT) {
60,034,068✔
403
            CHECK_OVERFLOW_SUM_SIGNED(pAvgRes, plist[i]);
59,983,500!
404
          } else {
405
            CHECK_OVERFLOW_SUM_UNSIGNED(pAvgRes, (uint64_t)plist[i]);
50,568✔
406
          }
407
        }
408
        break;
15,652,168✔
409
      }
410

411
      case TSDB_DATA_TYPE_FLOAT: {
1,431,730✔
412
        const float* plist = (const float*) pCol->pData;
1,431,730✔
413

414
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
5,901,661✔
415
          SUM_RES_INC_DSUM(&AVG_RES_GET_SUM(pAvgRes), plist[i]);
4,469,931✔
416
        }
417
        break;
1,431,730✔
418
      }
419
      case TSDB_DATA_TYPE_DOUBLE: {
467,792✔
420
        const double* plist = (const double*)pCol->pData;
467,792✔
421

422
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
9,423,263✔
423
          SUM_RES_INC_DSUM(&AVG_RES_GET_SUM(pAvgRes), plist[i]);
8,955,471✔
424
        }
425
        break;
467,792✔
426
      }
427
      case TSDB_DATA_TYPE_DECIMAL:
16✔
428
      case TSDB_DATA_TYPE_DECIMAL64: {
429
        const char* pDec = pCol->pData;
16✔
430
        for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
32✔
431
          bool overflow = false;
16✔
432
          if (type == TSDB_DATA_TYPE_DECIMAL64) {
16✔
433
            SUM_RES_INC_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pAvgRes), (const void*)(pDec + i * tDataTypes[type].bytes),
8!
434
                                    TSDB_DATA_TYPE_DECIMAL64);
435
          } else {
436
            SUM_RES_INC_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pAvgRes), (const void*)(pDec + i * tDataTypes[type].bytes),
8!
437
                                    TSDB_DATA_TYPE_DECIMAL);
438
          }
439
          if (overflow) return TSDB_CODE_DECIMAL_OVERFLOW;
16!
440
        }
441
      } break;
16✔
UNCOV
442
      default:
×
UNCOV
443
        return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
×
444
    }
445
  } else {
446
    int32_t code = doAddNumericVector(pCol, type, pInput, pAvgRes, &numOfElem);
18,322,730✔
447
    if (code) return code;
19,359,764!
448
  }
449

450
_over:
19,359,764✔
451
  // data in the check operation are all null, not output
452
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
42,609,914✔
453
  return TSDB_CODE_SUCCESS;
42,609,914✔
454
}
455

456
static int32_t avgTransferInfo(SqlFunctionCtx* pCtx, void* pInput, void* pOutput) {
19,690,982✔
457
  int32_t inputDT = pCtx->pExpr->pExpr->_function.pFunctNode->srcFuncInputType.type;
19,690,982✔
458
  int32_t type = AVG_RES_GET_TYPE(pInput, inputDT);
19,690,982!
459
  pCtx->inputType = type;
19,690,982✔
460
  if (IS_NULL_TYPE(type)) {
19,690,982✔
461
    return 0;
201✔
462
  }
463

464

465
  AVG_RES_SET_TYPE(pOutput, inputDT, type);
19,690,781!
466
  if (IS_SIGNED_NUMERIC_TYPE(type)) {
38,078,929!
467
    bool overflow = AVG_RES_GET_SUM_OVERFLOW(pInput, false, 0);
18,388,148✔
468
    CHECK_OVERFLOW_SUM_SIGNED_BIG(pOutput, (overflow ? SUM_RES_GET_DSUM(&AVG_RES_GET_SUM(pInput)) : SUM_RES_GET_ISUM(&AVG_RES_GET_SUM(pInput))), overflow);
18,388,148!
469
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
1,309,878✔
470
    bool overflow = AVG_RES_GET_SUM_OVERFLOW(pInput, false, 0);
7,245✔
471
    CHECK_OVERFLOW_SUM_UNSIGNED_BIG(pOutput, (overflow ? SUM_RES_GET_DSUM(&AVG_RES_GET_SUM(pInput)) : SUM_RES_GET_USUM(&AVG_RES_GET_SUM(pInput))), overflow);
7,245!
472
  } else if (IS_DECIMAL_TYPE(type)) {
1,295,788!
UNCOV
473
    AVG_RES_SET_INPUT_SCALE(pOutput, AVG_RES_GET_INPUT_SCALE(pInput));
×
UNCOV
474
    bool overflow = false;
×
UNCOV
475
    SUM_RES_INC_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pOutput), &AVG_RES_GET_DECIMAL_SUM(pInput), TSDB_DATA_TYPE_DECIMAL);
×
476
    if (overflow) return TSDB_CODE_DECIMAL_OVERFLOW;
400!
477
  } else {
478
    SUM_RES_INC_DSUM(&AVG_RES_GET_SUM(pOutput), SUM_RES_GET_DSUM(&AVG_RES_GET_SUM(pInput)));
1,300,211✔
479
  }
480

481
  AVG_RES_INC_COUNT(pOutput, type, AVG_RES_GET_COUNT(pInput, true, type));
19,696,004!
482
  return 0;
19,696,004✔
483
}
484

485
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
7,990,971✔
486
  SInputColumnInfoData* pInput = &pCtx->input;
7,990,971✔
487
  SColumnInfoData*      pCol = pInput->pData[0];
7,990,971✔
488

489
  if (IS_NULL_TYPE(pCol->info.type)) {
7,990,971✔
490
    SET_VAL(GET_RES_INFO(pCtx), 0, 1);
17✔
491
    return TSDB_CODE_SUCCESS;
17✔
492
  }
493

494
  if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
7,990,954!
UNCOV
495
    return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
×
496
  }
497

498
  void* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
7,990,954✔
499

500
  int32_t start = pInput->startRowIndex;
7,990,954✔
501

502
  for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
27,686,503✔
503
    if(colDataIsNull_s(pCol, i)) continue;
39,381,030✔
504
    char*    data = colDataGetData(pCol, i);
19,690,514!
505
    void* pInputInfo = varDataVal(data);
19,690,514✔
506
    int32_t code = avgTransferInfo(pCtx, pInputInfo, pInfo);
19,690,514✔
507
    if (code != 0) return code;
19,695,548!
508
  }
509

510
  SET_VAL(GET_RES_INFO(pCtx), 1, 1);
7,995,988✔
511

512
  return TSDB_CODE_SUCCESS;
7,995,988✔
513
}
514

515
#ifdef BUILD_NO_CALL
516
int32_t avgInvertFunction(SqlFunctionCtx* pCtx) {
517
  int32_t numOfElem = 0;
518

519
  // Only the pre-computing information loaded and actual data does not loaded
520
  SInputColumnInfoData* pInput = &pCtx->input;
521
  SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
522

523
  // computing based on the true data block
524
  SColumnInfoData* pCol = pInput->pData[0];
525

526
  int32_t start = pInput->startRowIndex;
527
  int32_t numOfRows = pInput->numOfRows;
528

529
  switch (pCol->info.type) {
530
    case TSDB_DATA_TYPE_TINYINT: {
531
      LIST_AVG_N(pAvgRes->sum.isum, int8_t);
532
      break;
533
    }
534
    case TSDB_DATA_TYPE_SMALLINT: {
535
      LIST_AVG_N(pAvgRes->sum.isum, int16_t);
536
      break;
537
    }
538
    case TSDB_DATA_TYPE_INT: {
539
      LIST_AVG_N(pAvgRes->sum.isum, int32_t);
540
      break;
541
    }
542
    case TSDB_DATA_TYPE_BIGINT: {
543
      LIST_AVG_N(pAvgRes->sum.isum, int64_t);
544
      break;
545
    }
546
    case TSDB_DATA_TYPE_UTINYINT: {
547
      LIST_AVG_N(pAvgRes->sum.usum, uint8_t);
548
      break;
549
    }
550
    case TSDB_DATA_TYPE_USMALLINT: {
551
      LIST_AVG_N(pAvgRes->sum.usum, uint16_t);
552
      break;
553
    }
554
    case TSDB_DATA_TYPE_UINT: {
555
      LIST_AVG_N(pAvgRes->sum.usum, uint32_t);
556
      break;
557
    }
558
    case TSDB_DATA_TYPE_UBIGINT: {
559
      LIST_AVG_N(pAvgRes->sum.usum, uint64_t);
560
      break;
561
    }
562
    case TSDB_DATA_TYPE_FLOAT: {
563
      LIST_AVG_N(pAvgRes->sum.dsum, float);
564
      break;
565
    }
566
    case TSDB_DATA_TYPE_DOUBLE: {
567
      LIST_AVG_N(pAvgRes->sum.dsum, double);
568
      break;
569
    }
570
    default:
571
      break;
572
  }
573

574
  // data in the check operation are all null, not output
575
  SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
576
  return TSDB_CODE_SUCCESS;
577
}
578
#endif
579

580
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
8✔
581
  SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
8✔
582
  void*                pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
8✔
583
  int32_t              type = AVG_RES_GET_TYPE(pDBuf, pDestCtx->inputType);
8!
584

585
  SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
8✔
586
  void*                pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
8✔
587
  type = (type == TSDB_DATA_TYPE_NULL) ? AVG_RES_GET_TYPE(pSBuf, pDestCtx->inputType) : type;
8!
588

589
  if (IS_SIGNED_NUMERIC_TYPE(type)) {
8!
UNCOV
590
    CHECK_OVERFLOW_SUM_SIGNED(pDBuf, SUM_RES_GET_ISUM(&AVG_RES_GET_SUM(pSBuf)));
×
591
  } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
8!
UNCOV
592
    CHECK_OVERFLOW_SUM_UNSIGNED(pDBuf, SUM_RES_GET_USUM(&AVG_RES_GET_SUM(pSBuf)));
×
593
  } else if (IS_DECIMAL_TYPE(type)) {
16!
594
    bool overflow = false;
8✔
595
    SUM_RES_INC_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pDBuf), &SUM_RES_GET_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pSBuf)), type);
8!
596
    if (overflow) {
597

598
    }
599
  } else {
UNCOV
600
    SUM_RES_INC_DSUM(&AVG_RES_GET_SUM(pDBuf), SUM_RES_GET_DSUM(&AVG_RES_GET_SUM(pSBuf)));
×
601
  }
602
  AVG_RES_INC_COUNT(pDBuf, pDestCtx->inputType, AVG_RES_GET_COUNT(pSBuf, true, pDestCtx->inputType));
8!
603

604
  return TSDB_CODE_SUCCESS;
8✔
605
}
606

607
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
8,546,169✔
608
  SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
8,546,169✔
609

610
  void*   pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
8,546,169✔
611
  int32_t type = AVG_RES_GET_TYPE(pRes, pCtx->inputType);
8,546,169✔
612
  int64_t count = AVG_RES_GET_COUNT(pRes, true, type);
8,546,169✔
613

614
  if (AVG_RES_GET_COUNT(pRes, true, pCtx->inputType) > 0) {
8,546,169✔
615
    
616
    if(AVG_RES_GET_SUM_OVERFLOW(pRes, true, pCtx->inputType)) {
8,480,796✔
617
      AVG_RES_GET_AVG(pRes) = SUM_RES_GET_DSUM(&AVG_RES_GET_SUM(pRes)) / ((double)AVG_RES_GET_COUNT(pRes, false, 0));
4,601✔
618
    }else if (IS_SIGNED_NUMERIC_TYPE(type)) {
8,476,195!
619
      AVG_RES_GET_AVG(pRes) = SUM_RES_GET_ISUM(&AVG_RES_GET_SUM(pRes)) / ((double)AVG_RES_GET_COUNT(pRes, false, 0));
5,913,628✔
620
    } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
2,562,567✔
621
      AVG_RES_GET_AVG(pRes) = SUM_RES_GET_USUM(&AVG_RES_GET_SUM(pRes)) / ((double)AVG_RES_GET_COUNT(pRes, false, 0));
171,104✔
622
    } else if (IS_DECIMAL_TYPE(type)) {
2,391,899✔
623
      int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
417✔
624
      SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
417✔
625
      SDataType        sumDt = {.type = TSDB_DATA_TYPE_DECIMAL,
436✔
626
                                .bytes = tDataTypes[TSDB_DATA_TYPE_DECIMAL].bytes,
436✔
627
                                .precision = pCol->info.precision,
436✔
628
                                .scale = AVG_RES_GET_INPUT_SCALE(pRes)};
436✔
629
      SDataType        countDt = {
436✔
630
                 .type = TSDB_DATA_TYPE_BIGINT, .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .precision = 0, .scale = 0};
436✔
631
      SDataType avgDt = {.type = TSDB_DATA_TYPE_DECIMAL,
436✔
632
                         .bytes = tDataTypes[TSDB_DATA_TYPE_DECIMAL].bytes,
436✔
633
                         .precision = pCol->info.precision,
436✔
634
                         .scale = pCol->info.scale};
436✔
635
      int64_t   count = AVG_RES_GET_COUNT(pRes, true, type);
436!
636
      int32_t   code =
637
          decimalOp(OP_TYPE_DIV, &sumDt, &countDt, &avgDt, &SUM_RES_GET_DECIMAL_SUM(&AVG_RES_GET_DECIMAL_SUM(pRes)),
436✔
638
                    &count, &AVG_RES_GET_DECIMAL_AVG(pRes));
436✔
639
      if (code != TSDB_CODE_SUCCESS) {
436!
UNCOV
640
        return code;
×
641
      }
642
    } else {
643
      AVG_RES_GET_AVG(pRes) = SUM_RES_GET_DSUM(&AVG_RES_GET_SUM(pRes)) / ((double)AVG_RES_GET_COUNT(pRes, false, 0));
2,391,046✔
644
    }
645
  }
646
  if (AVG_RES_GET_COUNT(pRes, true, pCtx->inputType) == 0) {
8,546,188✔
647
    pEntryInfo->numOfRes = 0;
65,379✔
648
  } else if (!IS_DECIMAL_TYPE(pCtx->inputType)) {
8,480,809✔
649
    if (isinf(AVG_RES_GET_AVG(pRes)) || isnan(AVG_RES_GET_AVG(pRes))) pEntryInfo->numOfRes = 0;
8,480,380!
650
  } else {
651
    pEntryInfo->numOfRes = 1;
429✔
652
  }
653

654
  return functionFinalize(pCtx, pBlock);
8,546,188✔
655
}
656

657
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
21,365,094✔
658
  SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
21,365,094✔
659
  void*                pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
21,365,094✔
660
  int32_t              resultBytes = AVG_RES_GET_SIZE(pCtx->inputType);
21,365,094!
661
  char*                res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
21,365,094!
662
  int32_t              code = TSDB_CODE_SUCCESS;
21,805,382✔
663
  if (NULL == res) {
21,805,382!
UNCOV
664
    return terrno;
×
665
  }
666
  (void)memcpy(varDataVal(res), pInfo, resultBytes);
21,805,382✔
667
  varDataSetLen(res, resultBytes);
21,805,382✔
668

669
  int32_t          slotId = pCtx->pExpr->base.resSchema.slotId;
21,805,382✔
670
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
21,805,382✔
671
  if(NULL == pCol) {
21,718,438!
UNCOV
672
    code = TSDB_CODE_OUT_OF_RANGE;
×
UNCOV
673
    goto _exit;
×
674
  }
675

676
  code = colDataSetVal(pCol, pBlock->info.rows, res, false);
21,718,438✔
677

678
_exit:
21,479,929✔
679
  taosMemoryFree(res);
21,479,929!
680
  return code;
21,850,023✔
681
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc