• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.21
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
442,385✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
442,385!
76

77
  int32_t    lino = 0;
442,385✔
78
  int32_t    code = 0;
442,385✔
79
  int32_t    num = 0;
442,385✔
80
  SExprInfo* pExprInfo = NULL;
442,385✔
81
  int32_t    numOfScalarExpr = 0;
442,385✔
82
  SExprInfo* pScalarExprInfo = NULL;
442,385✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
442,385!
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
442,804!
86
  if (pInfo == NULL || pOperator == NULL) {
442,905!
87
    code = terrno;
×
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
442,937✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
442,937✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
443,084!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
443,084✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
442,850✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
442,850✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
442,974✔
101
  TSDB_CHECK_CODE(code, lino, _error);
443,042!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
443,042✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
443,042✔
105
  TSDB_CHECK_CODE(code, lino, _error);
442,964!
106

107
  if (pAggNode->pExprs != NULL) {
442,964✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
94,140✔
109
    TSDB_CHECK_CODE(code, lino, _error);
94,142!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
442,966✔
113
  TSDB_CHECK_CODE(code, lino, _error);
442,765!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
442,765✔
116
  TSDB_CHECK_CODE(code, lino, _error);
442,732!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
442,732✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
442,732✔
120
  pInfo->groupId = UINT64_MAX;
442,732✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
442,732✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
442,732✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
442,732✔
124
  pInfo->pOperator = pOperator;
442,732✔
125
  pInfo->cleanGroupResInfo = false;
442,732✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
442,732✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
442,732✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
442,735✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
442,676✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
266,368✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
266,368✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
266,368✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
442,676✔
139
  if (code != TSDB_CODE_SUCCESS) {
442,812!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
442,812✔
144
  return TSDB_CODE_SUCCESS;
442,812✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
443,113✔
157
  if (param == NULL) {
443,113!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
443,113✔
161
  cleanupBasicInfo(&pInfo->binfo);
443,113✔
162

163
  if (pInfo->pOperator) {
443,154!
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
443,155✔
165
                      pInfo->cleanGroupResInfo);
443,155✔
166
    pInfo->pOperator = NULL;
443,148✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
443,147✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
443,158✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
443,154✔
171
  taosMemoryFreeClear(param);
443,156!
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
565,909✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
565,909✔
183
  int32_t           lino = 0;
565,909✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
565,909✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
565,909✔
186

187
  if(!pAggInfo) {
565,909!
188
    qError("function:%s, pAggInfo is NULL", __func__);
×
189
    return false;
×
190
  }
191
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
565,909✔
192
    return false;
73,952✔
193
  }
194

195
  SExprSupp*   pSup = &pOperator->exprSupp;
491,957✔
196
  int64_t      st = taosGetTimestampUs();
492,898✔
197
  int32_t      order = pAggInfo->binfo.inputTsOrder;
492,898✔
198
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
492,898✔
199

200
  pAggInfo->cleanGroupResInfo = false;
492,898✔
201
  if (pBlock) {
492,898✔
202
    pAggInfo->pNewGroupBlock = NULL;
49,895✔
203
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,895✔
204
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,895✔
205
    QUERY_CHECK_CODE(code, lino, _end);
49,895!
206
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,895✔
207
    QUERY_CHECK_CODE(code, lino, _end);
49,895!
208

209
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,895✔
210
    QUERY_CHECK_CODE(code, lino, _end);
49,895!
211
  }
212
  while (1) {
1,335,599✔
213
    bool blockAllocated = false;
1,828,497✔
214
    pBlock = getNextBlockFromDownstream(pOperator, 0);
1,828,497✔
215
    if (pBlock == NULL) {
1,828,088✔
216
      if (!pAggInfo->hasValidBlock) {
455,816✔
217
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
93,175✔
218
        QUERY_CHECK_CODE(code, lino, _end);
93,174!
219

220
        if (pBlock == NULL) {
93,174✔
221
          break;
79,915✔
222
        }
223
        blockAllocated = true;
13,259✔
224
      } else {
225
        break;
362,641✔
226
      }
227
    }
228
    pAggInfo->hasValidBlock = true;
1,385,531✔
229
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
1,385,531✔
230

231
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
232
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
1,385,531✔
233
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
275,945✔
234
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
275,945✔
235
      if (code != TSDB_CODE_SUCCESS) {
275,865!
236
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
237
        T_LONG_JMP(pTaskInfo->env, code);
×
238
      }
239
    }
240
    // if non-blocking mode and new group arrived, save the block and break
241
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
1,385,451✔
242
      pAggInfo->pNewGroupBlock = pBlock;
49,947✔
243
      break;
49,947✔
244
    }
245
    // the pDataBlock are always the same one, no need to call this again
246
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
1,335,504✔
247
    if (code != TSDB_CODE_SUCCESS) {
1,335,867✔
248
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1✔
249
      T_LONG_JMP(pTaskInfo->env, code);
1!
250
    }
251
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
1,335,866✔
252
    if (code != TSDB_CODE_SUCCESS) {
1,335,753!
253
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
254
      T_LONG_JMP(pTaskInfo->env, code);
×
255
    }
256

257
    code = doAggregateImpl(pOperator, pSup->pCtx);
1,335,753✔
258
    if (code != TSDB_CODE_SUCCESS) {
1,335,595!
259
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
260
      T_LONG_JMP(pTaskInfo->env, code);
×
261
    }
262

263
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1,335,595✔
264
  }
265

266
  // the downstream operator may return with error code, so let's check the code before generating results.
267
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
492,503!
UNCOV
268
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
269
  }
270

271
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
492,503✔
272
  QUERY_CHECK_CODE(code, lino, _end);
492,688!
273
  pAggInfo->cleanGroupResInfo = true;
492,688✔
274

275
_end:
492,688✔
276
  if (code != TSDB_CODE_SUCCESS) {
492,688!
277
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
278
    pTaskInfo->code = code;
×
279
    T_LONG_JMP(pTaskInfo->env, code);
×
280
  }
281
  return pBlock != NULL;
492,688✔
282
}
283

284
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
905,373✔
285
  int32_t           code = TSDB_CODE_SUCCESS;
905,373✔
286
  int32_t           lino = 0;
905,373✔
287
  SAggOperatorInfo* pAggInfo = pOperator->info;
905,373✔
288
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
905,373✔
289

290
  if (pOperator->status == OP_EXEC_DONE) {
905,373✔
291
    (*ppRes) = NULL;
339,490✔
292
    return code;
339,490✔
293
  }
294

295
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
565,883✔
296
  bool           hasNewGroups = false;
565,883✔
297
  do {
298
    hasNewGroups = nextGroupedResult(pOperator);
565,911✔
299
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
566,494✔
300
    QUERY_CHECK_CODE(code, lino, _end);
566,602!
301

302
    while (1) {
303
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
566,602✔
304
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
566,541✔
305
      QUERY_CHECK_CODE(code, lino, _end);
566,493!
306

307
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
566,493✔
308
        if (!hasNewGroups) setOperatorCompleted(pOperator);
492,618✔
309
        break;
492,650✔
310
      }
311

312
      if (pInfo->pRes->info.rows > 0) {
73,922!
313
        break;
73,922✔
314
      }
315
    }
316
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
566,572✔
317

318
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
566,544✔
319
  pOperator->resultInfo.totalRows += rows;
566,485✔
320

321
_end:
566,485✔
322
  if (code != TSDB_CODE_SUCCESS) {
566,485!
323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
324
    pTaskInfo->code = code;
×
325
    T_LONG_JMP(pTaskInfo->env, code);
×
326
  }
327

328
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
566,485✔
329
  return code;
566,485✔
330
}
331

332
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
333
  SSDataBlock* pRes = NULL;
×
334
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
335
  return pRes;
×
336
}
337

338
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
1,385,454✔
339
  int32_t code = TSDB_CODE_SUCCESS;
1,385,454✔
340
  if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) {
1,385,454!
341
    qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__);
×
342
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
343
  }
344
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
4,185,129✔
345
    if (functionNeedToExecute(&pCtx[k])) {
2,799,405✔
346
      // todo add a dummy function to avoid process check
347
      if (pCtx[k].fpSet.process == NULL) {
2,798,620✔
348
        continue;
29,837✔
349
      }
350

351
      if ((&pCtx[k])->input.pData[0] == NULL) {
2,768,783!
352
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
353
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
354
      } else {
355
        code = pCtx[k].fpSet.process(&pCtx[k]);
2,768,783✔
356
      }
357

358
      if (code != TSDB_CODE_SUCCESS) {
2,768,850!
359
        if (pCtx[k].fpSet.cleanup != NULL) {
×
360
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
361
        }
362
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
363
        return code;
×
364
      }
365
    }
366
  }
367

368
  return TSDB_CODE_SUCCESS;
1,385,724✔
369
}
370

371
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
93,161✔
372
  int32_t code = TSDB_CODE_SUCCESS;
93,161✔
373
  int32_t lino = 0;
93,161✔
374
  SSDataBlock* pBlock = NULL;
93,161✔
375
  if (!tsCountAlwaysReturnValue) {
93,161✔
376
    return TSDB_CODE_SUCCESS;
48,365✔
377
  }
378

379
  SAggOperatorInfo* pAggInfo = pOperator->info;
44,796✔
380
  if (pAggInfo->groupKeyOptimized) {
44,796✔
381
    return TSDB_CODE_SUCCESS;
15,482✔
382
  }
383

384
  SOperatorInfo* downstream = pOperator->pDownstream[0];
29,314✔
385
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
29,314✔
386
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
29,027✔
387
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
28,802✔
388
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
9,366✔
389
    return TSDB_CODE_SUCCESS;
521✔
390
  }
391

392
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
28,793✔
393

394
  if (!pAggInfo->hasCountFunc) {
28,793✔
395
    return TSDB_CODE_SUCCESS;
15,543✔
396
  }
397

398
  code = createDataBlock(&pBlock);
13,250✔
399
  if (code) {
13,261!
400
    return code;
×
401
  }
402

403
  pBlock->info.rows = 1;
13,261✔
404
  pBlock->info.capacity = 0;
13,261✔
405

406
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
32,357✔
407
    SColumnInfoData colInfo = {0};
19,096✔
408
    colInfo.hasNull = true;
19,096✔
409
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
19,096✔
410
    colInfo.info.bytes = 1;
19,096✔
411

412
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
19,096✔
413
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
38,620✔
414
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
19,524✔
415
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
19,524✔
416
        int32_t slotId = pFuncParam->pCol->slotId;
19,463✔
417
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
19,463✔
418
        if (slotId >= numOfCols) {
19,464✔
419
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
14,175✔
420
          QUERY_CHECK_CODE(code, lino, _end);
14,173!
421

422
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
30,788✔
423
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
16,614✔
424
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
16,615!
425
          }
426
        }
427
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
61✔
428
        // do nothing
429
      }
430
    }
431
  }
432

433
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
13,261✔
434
  QUERY_CHECK_CODE(code, lino, _end);
13,262!
435

436
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
29,876✔
437
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
16,615✔
438
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
16,613!
439
    colDataSetNULL(pColInfoData, 0);
440
  }
441
  *ppBlock = pBlock;
13,261✔
442

443
_end:
13,261✔
444
  if (code != TSDB_CODE_SUCCESS) {
13,261!
445
    blockDataDestroy(pBlock);
×
446
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
447
  }
448
  return code;
13,260✔
449
}
450

451
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
1,335,550✔
452
  if (!blockAllocated) {
1,335,550✔
453
    return;
1,322,353✔
454
  }
455

456
  blockDataDestroy(*ppBlock);
13,197✔
457
  *ppBlock = NULL;
13,262✔
458
}
459

460
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1,385,541✔
461
  int32_t           code = TSDB_CODE_SUCCESS;
1,385,541✔
462
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,385,541✔
463
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
1,385,541✔
464
    return code;
469,932✔
465
  }
466

467
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
915,609✔
468

469
  // record the current active group id
470
  pAggInfo->groupId = groupId;
915,861✔
471
  return code;
915,861✔
472
}
473

474
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
915,902✔
475
  // for simple group by query without interval, all the tables belong to one group result.
476
  int32_t           code = TSDB_CODE_SUCCESS;
915,902✔
477
  int32_t           lino = 0;
915,902✔
478
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
915,902✔
479
  SAggOperatorInfo* pAggInfo = pOperator->info;
915,902✔
480

481
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
915,902✔
482
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
915,902✔
483
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
915,902✔
484

485
  SResultRow* pResultRow =
486
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
915,902✔
487
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
488
  if (pResultRow == NULL || pTaskInfo->code != 0) {
916,234!
489
    code = pTaskInfo->code;
×
490
    lino = __LINE__;
×
491
    goto _end;
×
492
  }
493
  /*
494
   * not assign result buffer yet, add new result buffer
495
   * all group belong to one result set, and each group result has different group id so set the id to be one
496
   */
497
  if (pResultRow->pageId == -1) {
916,239!
498
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
499
    QUERY_CHECK_CODE(code, lino, _end);
×
500
  }
501

502
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
916,239✔
503
  QUERY_CHECK_CODE(code, lino, _end);
915,972!
504

505
_end:
915,972✔
506
  if (code != TSDB_CODE_SUCCESS) {
915,967✔
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1!
508
  }
509
  return code;
915,878✔
510
}
511

512
// a new buffer page for each table. Needs to opt this design
513
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
514
  if (pWindowRes->pageId != -1) {
×
515
    return 0;
×
516
  }
517

518
  SFilePage* pData = NULL;
×
519

520
  // in the first scan, new space needed for results
521
  int32_t pageId = -1;
×
522
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
523

524
  if (taosArrayGetSize(list) == 0) {
×
525
    pData = getNewBufPage(pResultBuf, &pageId);
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530
    pData->num = sizeof(SFilePage);
×
531
  } else {
532
    SPageInfo* pi = getLastPageInfo(list);
×
533
    pData = getBufPage(pResultBuf, getPageId(pi));
×
534
    if (pData == NULL) {
×
535
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
536
      return terrno;
×
537
    }
538

539
    pageId = getPageId(pi);
×
540

541
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
542
      // release current page first, and prepare the next one
543
      releaseBufPageInfo(pResultBuf, pi);
×
544

545
      pData = getNewBufPage(pResultBuf, &pageId);
×
546
      if (pData == NULL) {
×
547
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
548
        return terrno;
×
549
      }
550
      pData->num = sizeof(SFilePage);
×
551
    }
552
  }
553

554
  if (pData == NULL) {
×
555
    return -1;
×
556
  }
557

558
  // set the number of rows in current disk page
559
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
560
    pWindowRes->pageId = pageId;
×
561
    pWindowRes->offset = (int32_t)pData->num;
×
562

563
    pData->num += size;
×
564
  }
565

566
  return 0;
×
567
}
568

569
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
1,250,744✔
570
                         const char* pKey) {
571
  int32_t code = 0;
1,250,744✔
572
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
573

574
  pAggSup->currentPageId = -1;
1,250,744✔
575
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
1,250,744✔
576
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
1,251,939!
577
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
1,251,777✔
578

579
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
1,251,358!
580
    return terrno;
×
581
  }
582

583
  uint32_t defaultPgsz = 0;
1,251,501✔
584
  int64_t defaultBufsz = 0;
1,251,501✔
585
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
1,251,501✔
586
  if (code) {
1,251,914!
587
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
588
    return code;
×
589
  }
590

591
  if (!osTempSpaceAvailable()) {
1,251,914!
592
    code = TSDB_CODE_NO_DISKSPACE;
×
593
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
594
    return code;
×
595
  }
596

597
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
1,251,309✔
598
  if (code != TSDB_CODE_SUCCESS) {
1,252,128!
599
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
×
600
    return code;
×
601
  }
602

603
  return code;
1,252,180✔
604
}
605

606
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1,752✔
607
  int32_t         code = TSDB_CODE_SUCCESS;
1,752✔
608
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
1,752✔
609
  int32_t         numOfExprs = pSup->numOfExprs;
1,752✔
610
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,752✔
611
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,752✔
612
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,752✔
613
  bool            needCleanup = false;
1,752✔
614

615
  for (int32_t j = 0; j < numOfExprs; ++j) {
28,350✔
616
    needCleanup |= pCtx[j].needCleanup;
26,598✔
617
  }
618
  if (!needCleanup) {
1,752!
619
    return;
1,752✔
620
  }
621
  
622
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
623
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
624
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
625
    SResultRow*        pRow = NULL;
×
626

627
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
628
    if (TSDB_CODE_SUCCESS != code) {
×
629
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
630
      continue;
×
631
    }
632

633
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
634
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
635
      if (pCtx[j].fpSet.cleanup) {
×
636
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
637
      }
638
    }
639
  }
640
}
641

642
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
516,895✔
643
                                  SGroupResInfo* pGroupResInfo) {
644
  int32_t         numOfExprs = pSup->numOfExprs;
516,895✔
645
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
516,895✔
646
  SqlFunctionCtx* pCtx = pSup->pCtx;
516,895✔
647
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
516,895✔
648
  bool            needCleanup = false;
516,911✔
649

650
  for (int32_t j = 0; j < numOfExprs; ++j) {
1,792,564✔
651
    needCleanup |= pCtx[j].needCleanup;
1,275,653✔
652
  }
653
  if (!needCleanup) {
516,911✔
654
    return;
511,920✔
655
  }
656

657
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
4,991!
658
    SResultRow*        pRow = NULL;
×
659
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
660
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
661
    if (page == NULL) {
×
662
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
663
      continue;
×
664
    }
665
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
666

667

668
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
669
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
670
      if (pCtx[j].fpSet.cleanup) {
×
671
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
672
      }
673
    }
674
    releaseBufPage(pBuf, page);
×
675
  }
676
}
677

678
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
192,837✔
679
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
680
  int32_t         numOfExprs = pSup->numOfExprs;
192,837✔
681
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
192,837✔
682
  SqlFunctionCtx* pCtx = pSup->pCtx;
192,837✔
683
  bool            needCleanup = false;
192,837✔
684
  for (int32_t j = 0; j < numOfExprs; ++j) {
719,355✔
685
    needCleanup |= pCtx[j].needCleanup;
526,518✔
686
  }
687
  if (!needCleanup) {
192,837✔
688
    return;
192,802✔
689
  }
690

691
  // begin from last iter
692
  void*   pData = pGroupResInfo->dataPos;
35✔
693
  int32_t iter = pGroupResInfo->iter;
35✔
694
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
35!
695
    SResultRowPosition* pos = pData;
×
696

697
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
698
    if (page == NULL) {
×
699
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
700
      continue;
×
701
    }
702

703
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
704

705
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
706
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
707
      if (pCtx[j].fpSet.cleanup) {
×
708
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
709
      }
710
    }
711

712
    releaseBufPage(pBuf, page);
×
713
  }
714
}
715

716
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
709,718✔
717
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
718
  if (cleanGroupResInfo) {
709,718✔
719
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
516,904✔
720
  } else {
721
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
192,814✔
722
  }
723
}
709,752✔
724
void cleanupAggSup(SAggSupporter* pAggSup) {
1,252,250✔
725
  taosMemoryFreeClear(pAggSup->keyBuf);
1,252,250!
726
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
1,252,303✔
727
  destroyDiskbasedBuf(pAggSup->pResultBuf);
1,252,334✔
728
}
1,252,338✔
729

730
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
1,251,268✔
731
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
732
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
1,251,268✔
733
  if (code != TSDB_CODE_SUCCESS) {
1,251,562!
734
    return code;
×
735
  }
736

737
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
1,251,562✔
738
  if (code != TSDB_CODE_SUCCESS) {
1,252,195!
739
    return code;
×
740
  }
741

742
  for (int32_t i = 0; i < numOfCols; ++i) {
5,166,365✔
743
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
3,914,170✔
744
    if (pState) {
3,914,170✔
745
      pSup->pCtx[i].saveHandle.pBuf = NULL;
252,999✔
746
      pSup->pCtx[i].saveHandle.pState = pState;
252,999✔
747
      pSup->pCtx[i].exprIdx = i;
252,999✔
748
    } else {
749
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
3,661,171✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
1,252,195✔
754
}
755

756
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
119,929,217✔
757
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
758
  int32_t code = TSDB_CODE_SUCCESS;
119,929,217✔
759
  for (int32_t k = 0; k < numOfOutput; ++k) {
688,252,126✔
760
    // keep it temporarily
761
    SFunctionCtxStatus status = {0};
569,087,084✔
762
    functionCtxSave(&pCtx[k], &status);
569,087,084✔
763

764
    pCtx[k].input.startRowIndex = offset;
570,418,680✔
765
    pCtx[k].input.numOfRows = forwardStep;
570,418,680✔
766

767
    // not a whole block involved in query processing, statistics data can not be used
768
    // NOTE: the original value of isSet have been changed here
769
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
570,418,680!
770
      pCtx[k].input.colDataSMAIsSet = false;
×
771
    }
772

773
    if (pCtx[k].isPseudoFunc) {
570,418,680✔
774
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
165,722,798✔
775

776
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
165,722,798✔
777

778
      SColumnInfoData idata = {0};
165,722,798✔
779
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
165,722,798✔
780
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
165,722,798✔
781
      idata.pData = p;
165,722,798✔
782

783
      SScalarParam out = {.columnData = &idata};
165,722,798✔
784
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
165,722,798✔
785
      code = pCtx[k].sfp.process(&tw, 1, &out);
165,722,798✔
786
      if (code != TSDB_CODE_SUCCESS) {
168,506,525!
787
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
788
        taskInfo->code = code;
×
789
        return code;
×
790
      }
791
      pEntryInfo->numOfRes = 1;
168,750,308✔
792
    } else {
793
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
404,695,882✔
794
        if ((&pCtx[k])->input.pData[0] == NULL) {
388,550,943!
795
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
796
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
797
        } else {
798
          code = pCtx[k].fpSet.process(&pCtx[k]);
388,550,943✔
799
        }
800

801
        if (code != TSDB_CODE_SUCCESS) {
388,726,245!
802
          if (pCtx[k].fpSet.cleanup != NULL) {
×
803
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
804
          }
805
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
806
          taskInfo->code = code;
×
807
          return code;
×
808
        }
809
      }
810

811
      // restore it
812
      functionCtxRestore(&pCtx[k], &status);
412,012,576✔
813
    }
814
  }
815
  return code;
119,165,042✔
816
}
817

818
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
569,532,193✔
819
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
569,532,193✔
820
  pStatus->numOfRows = pCtx->input.numOfRows;
569,532,193✔
821
  pStatus->startOffset = pCtx->input.startRowIndex;
569,532,193✔
822
}
569,532,193✔
823

824
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
410,859,155✔
825
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
410,859,155✔
826
  pCtx->input.numOfRows = pStatus->numOfRows;
410,859,155✔
827
  pCtx->input.startRowIndex = pStatus->startOffset;
410,859,155✔
828
}
410,859,155✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc