• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.66
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
945,197✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
945,197!
76

77
  int32_t    lino = 0;
945,197✔
78
  int32_t    code = 0;
945,197✔
79
  int32_t    num = 0;
945,197✔
80
  SExprInfo* pExprInfo = NULL;
945,197✔
81
  int32_t    numOfScalarExpr = 0;
945,197✔
82
  SExprInfo* pScalarExprInfo = NULL;
945,197✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
945,197!
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
946,963!
86
  if (pInfo == NULL || pOperator == NULL) {
947,583!
87
    code = terrno;
×
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
947,970✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
947,970✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
948,813!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
948,813✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
947,594✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
947,594✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
948,156✔
101
  TSDB_CHECK_CODE(code, lino, _error);
946,523!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
946,523✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
946,523✔
105
  TSDB_CHECK_CODE(code, lino, _error);
947,317!
106

107
  if (pAggNode->pExprs != NULL) {
947,317✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
159,110✔
109
    TSDB_CHECK_CODE(code, lino, _error);
159,086!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
947,293✔
113
  TSDB_CHECK_CODE(code, lino, _error);
946,394!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
946,394✔
116
  TSDB_CHECK_CODE(code, lino, _error);
947,105!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
947,105✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
947,105✔
120
  pInfo->groupId = UINT64_MAX;
947,105✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
947,105✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
947,105✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
947,105✔
124
  pInfo->pOperator = pOperator;
947,105✔
125
  pInfo->cleanGroupResInfo = false;
947,105✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
947,105✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
947,105✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
946,733✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
946,315✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
317,482✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
317,482✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
317,482✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
946,315✔
139
  if (code != TSDB_CODE_SUCCESS) {
947,542!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
947,542✔
144
  return TSDB_CODE_SUCCESS;
947,542✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
948,094✔
157
  if (param == NULL) {
948,094!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
948,094✔
161
  cleanupBasicInfo(&pInfo->binfo);
948,094✔
162

163
  if (pInfo->pOperator) {
949,649!
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
949,655✔
165
                      pInfo->cleanGroupResInfo);
949,655✔
166
    pInfo->pOperator = NULL;
949,726✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
949,720✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
949,561✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
948,813✔
171
  taosMemoryFreeClear(param);
949,712!
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
1,067,083✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
1,067,083✔
183
  int32_t           lino = 0;
1,067,083✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1,067,083✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,067,083✔
186

187
  if(!pAggInfo) {
1,067,083!
188
    qError("function:%s, pAggInfo is NULL", __func__);
×
189
    return false;
×
190
  }
191
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
1,067,083✔
192
    return false;
73,683✔
193
  }
194

195
  SExprSupp*   pSup = &pOperator->exprSupp;
993,400✔
196
  int64_t      st = taosGetTimestampUs();
998,188✔
197
  int32_t      order = pAggInfo->binfo.inputTsOrder;
998,188✔
198
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
998,188✔
199

200
  pAggInfo->cleanGroupResInfo = false;
998,188✔
201
  if (pBlock) {
998,188✔
202
    pAggInfo->pNewGroupBlock = NULL;
49,913✔
203
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,913✔
204
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,913✔
205
    QUERY_CHECK_CODE(code, lino, _end);
49,913!
206
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,913✔
207
    QUERY_CHECK_CODE(code, lino, _end);
49,913!
208

209
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,913✔
210
    QUERY_CHECK_CODE(code, lino, _end);
49,913!
211
  }
212
  while (1) {
1,978,528✔
213
    bool blockAllocated = false;
2,976,716✔
214
    pBlock = getNextBlockFromDownstream(pOperator, 0);
2,976,716✔
215
    if (pBlock == NULL) {
2,975,813✔
216
      if (!pAggInfo->hasValidBlock) {
1,062,770✔
217
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
193,899✔
218
        QUERY_CHECK_CODE(code, lino, _end);
193,872!
219

220
        if (pBlock == NULL) {
193,872✔
221
          break;
80,043✔
222
        }
223
        blockAllocated = true;
113,829✔
224
      } else {
225
        break;
868,871✔
226
      }
227
    }
228
    pAggInfo->hasValidBlock = true;
2,026,872✔
229
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
2,026,872✔
230

231
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
232
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
2,026,872✔
233
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
361,001✔
234
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
361,001✔
235
      if (code != TSDB_CODE_SUCCESS) {
360,910!
236
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
237
        T_LONG_JMP(pTaskInfo->env, code);
×
238
      }
239
    }
240
    // if non-blocking mode and new group arrived, save the block and break
241
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
2,026,781✔
242
      pAggInfo->pNewGroupBlock = pBlock;
49,965✔
243
      break;
49,965✔
244
    }
245
    // the pDataBlock are always the same one, no need to call this again
246
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
1,976,816✔
247
    if (code != TSDB_CODE_SUCCESS) {
1,977,924✔
248
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1✔
249
      T_LONG_JMP(pTaskInfo->env, code);
1!
250
    }
251
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
1,977,923✔
252
    if (code != TSDB_CODE_SUCCESS) {
1,978,309!
253
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
254
      T_LONG_JMP(pTaskInfo->env, code);
×
255
    }
256

257
    code = doAggregateImpl(pOperator, pSup->pCtx);
1,978,309✔
258
    if (code != TSDB_CODE_SUCCESS) {
1,978,045!
259
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
260
      T_LONG_JMP(pTaskInfo->env, code);
×
261
    }
262

263
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
1,978,045✔
264
  }
265

266
  // the downstream operator may return with error code, so let's check the code before generating results.
267
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
998,879✔
268
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1!
269
  }
270

271
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
998,878✔
272
  QUERY_CHECK_CODE(code, lino, _end);
999,120!
273
  pAggInfo->cleanGroupResInfo = true;
999,120✔
274

275
_end:
999,120✔
276
  if (code != TSDB_CODE_SUCCESS) {
999,120!
277
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
278
    pTaskInfo->code = code;
×
279
    T_LONG_JMP(pTaskInfo->env, code);
×
280
  }
281
  return pBlock != NULL;
999,120✔
282
}
283

284
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,911,622✔
285
  int32_t           code = TSDB_CODE_SUCCESS;
1,911,622✔
286
  int32_t           lino = 0;
1,911,622✔
287
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,911,622✔
288
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
1,911,622✔
289

290
  if (pOperator->status == OP_EXEC_DONE) {
1,911,622✔
291
    (*ppRes) = NULL;
845,244✔
292
    return code;
845,244✔
293
  }
294

295
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,066,378✔
296
  bool           hasNewGroups = false;
1,066,378✔
297
  do {
298
    hasNewGroups = nextGroupedResult(pOperator);
1,066,406✔
299
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
1,071,909✔
300
    QUERY_CHECK_CODE(code, lino, _end);
1,073,030!
301

302
    while (1) {
303
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
1,073,030✔
304
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,072,180✔
305
      QUERY_CHECK_CODE(code, lino, _end);
1,071,894!
306

307
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
1,071,894✔
308
        if (!hasNewGroups) setOperatorCompleted(pOperator);
998,645✔
309
        break;
998,519✔
310
      }
311

312
      if (pInfo->pRes->info.rows > 0) {
73,638!
313
        break;
73,638✔
314
      }
315
    }
316
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
1,072,157✔
317

318
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1,072,129✔
319
  pOperator->resultInfo.totalRows += rows;
1,071,624✔
320

321
_end:
1,071,624✔
322
  if (code != TSDB_CODE_SUCCESS) {
1,071,624!
323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
324
    pTaskInfo->code = code;
×
325
    T_LONG_JMP(pTaskInfo->env, code);
×
326
  }
327

328
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
1,071,624✔
329
  return code;
1,071,624✔
330
}
331

332
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
333
  SSDataBlock* pRes = NULL;
×
334
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
335
  return pRes;
×
336
}
337

338
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
2,027,218✔
339
  int32_t code = TSDB_CODE_SUCCESS;
2,027,218✔
340
  if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) {
2,027,218!
341
    qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__);
×
342
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
343
  }
344
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
5,519,460✔
345
    if (functionNeedToExecute(&pCtx[k])) {
3,489,651✔
346
      // todo add a dummy function to avoid process check
347
      if (pCtx[k].fpSet.process == NULL) {
3,489,375✔
348
        continue;
29,796✔
349
      }
350

351
      if ((&pCtx[k])->input.pData[0] == NULL) {
3,459,579!
352
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
353
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
354
      } else {
355
        code = pCtx[k].fpSet.process(&pCtx[k]);
3,459,579✔
356
      }
357

358
      if (code != TSDB_CODE_SUCCESS) {
3,460,337!
359
        if (pCtx[k].fpSet.cleanup != NULL) {
×
360
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
361
        }
362
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
363
        return code;
×
364
      }
365
    }
366
  }
367

368
  return TSDB_CODE_SUCCESS;
2,029,809✔
369
}
370

371
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
193,814✔
372
  int32_t code = TSDB_CODE_SUCCESS;
193,814✔
373
  int32_t lino = 0;
193,814✔
374
  SSDataBlock* pBlock = NULL;
193,814✔
375
  if (!tsCountAlwaysReturnValue) {
193,814✔
376
    return TSDB_CODE_SUCCESS;
48,362✔
377
  }
378

379
  SAggOperatorInfo* pAggInfo = pOperator->info;
145,452✔
380
  if (pAggInfo->groupKeyOptimized) {
145,452✔
381
    return TSDB_CODE_SUCCESS;
15,244✔
382
  }
383

384
  SOperatorInfo* downstream = pOperator->pDownstream[0];
130,208✔
385
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
130,208✔
386
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
130,008✔
387
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
129,651✔
388
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
10,028✔
389
    return TSDB_CODE_SUCCESS;
650✔
390
  }
391

392
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
129,558✔
393

394
  if (!pAggInfo->hasCountFunc) {
129,558✔
395
    return TSDB_CODE_SUCCESS;
15,785✔
396
  }
397

398
  code = createDataBlock(&pBlock);
113,773✔
399
  if (code) {
113,855!
400
    return code;
×
401
  }
402

403
  pBlock->info.rows = 1;
113,855✔
404
  pBlock->info.capacity = 0;
113,855✔
405

406
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
233,110✔
407
    SColumnInfoData colInfo = {0};
119,338✔
408
    colInfo.hasNull = true;
119,338✔
409
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
119,338✔
410
    colInfo.info.bytes = 1;
119,338✔
411

412
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
119,338✔
413
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
239,040✔
414
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
119,785✔
415
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
119,785✔
416
        int32_t slotId = pFuncParam->pCol->slotId;
119,738✔
417
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
119,738✔
418
        if (slotId >= numOfCols) {
119,704✔
419
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
114,539✔
420
          QUERY_CHECK_CODE(code, lino, _end);
114,506!
421

422
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
231,897✔
423
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
117,407✔
424
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
117,391!
425
          }
426
        }
427
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
47✔
428
        // do nothing
429
      }
430
    }
431
  }
432

433
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
113,772✔
434
  QUERY_CHECK_CODE(code, lino, _end);
113,845!
435

436
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
231,360✔
437
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
117,439✔
438
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
117,506!
439
    colDataSetNULL(pColInfoData, 0);
440
  }
441
  *ppBlock = pBlock;
113,834✔
442

443
_end:
113,834✔
444
  if (code != TSDB_CODE_SUCCESS) {
113,834!
445
    blockDataDestroy(pBlock);
×
446
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
447
  }
448
  return code;
113,838✔
449
}
450

451
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
1,978,105✔
452
  if (!blockAllocated) {
1,978,105✔
453
    return;
1,864,690✔
454
  }
455

456
  blockDataDestroy(*ppBlock);
113,415✔
457
  *ppBlock = NULL;
113,865✔
458
}
459

460
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
2,027,996✔
461
  int32_t           code = TSDB_CODE_SUCCESS;
2,027,996✔
462
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,027,996✔
463
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
2,027,996✔
464
    return code;
608,306✔
465
  }
466

467
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
1,419,690✔
468

469
  // record the current active group id
470
  pAggInfo->groupId = groupId;
1,419,563✔
471
  return code;
1,419,563✔
472
}
473

474
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
1,420,082✔
475
  // for simple group by query without interval, all the tables belong to one group result.
476
  int32_t           code = TSDB_CODE_SUCCESS;
1,420,082✔
477
  int32_t           lino = 0;
1,420,082✔
478
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1,420,082✔
479
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,420,082✔
480

481
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
1,420,082✔
482
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
1,420,082✔
483
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
1,420,082✔
484

485
  SResultRow* pResultRow =
486
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
1,420,082✔
487
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
488
  if (pResultRow == NULL || pTaskInfo->code != 0) {
1,420,771!
489
    code = pTaskInfo->code;
×
490
    lino = __LINE__;
×
491
    goto _end;
×
492
  }
493
  /*
494
   * not assign result buffer yet, add new result buffer
495
   * all group belong to one result set, and each group result has different group id so set the id to be one
496
   */
497
  if (pResultRow->pageId == -1) {
1,420,828!
498
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
499
    QUERY_CHECK_CODE(code, lino, _end);
×
500
  }
501

502
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
1,420,828✔
503
  QUERY_CHECK_CODE(code, lino, _end);
1,419,732!
504

505
_end:
1,419,732✔
506
  if (code != TSDB_CODE_SUCCESS) {
1,419,675✔
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1!
508
  }
509
  return code;
1,419,708✔
510
}
511

512
// a new buffer page for each table. Needs to opt this design
513
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
514
  if (pWindowRes->pageId != -1) {
×
515
    return 0;
×
516
  }
517

518
  SFilePage* pData = NULL;
×
519

520
  // in the first scan, new space needed for results
521
  int32_t pageId = -1;
×
522
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
523

524
  if (taosArrayGetSize(list) == 0) {
×
525
    pData = getNewBufPage(pResultBuf, &pageId);
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530
    pData->num = sizeof(SFilePage);
×
531
  } else {
532
    SPageInfo* pi = getLastPageInfo(list);
×
533
    pData = getBufPage(pResultBuf, getPageId(pi));
×
534
    if (pData == NULL) {
×
535
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
536
      return terrno;
×
537
    }
538

539
    pageId = getPageId(pi);
×
540

541
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
542
      // release current page first, and prepare the next one
543
      releaseBufPageInfo(pResultBuf, pi);
×
544

545
      pData = getNewBufPage(pResultBuf, &pageId);
×
546
      if (pData == NULL) {
×
547
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
548
        return terrno;
×
549
      }
550
      pData->num = sizeof(SFilePage);
×
551
    }
552
  }
553

554
  if (pData == NULL) {
×
555
    return -1;
×
556
  }
557

558
  // set the number of rows in current disk page
559
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
560
    pWindowRes->pageId = pageId;
×
561
    pWindowRes->offset = (int32_t)pData->num;
×
562

563
    pData->num += size;
×
564
  }
565

566
  return 0;
×
567
}
568

569
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
2,059,934✔
570
                         const char* pKey) {
571
  int32_t code = 0;
2,059,934✔
572
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
573

574
  pAggSup->currentPageId = -1;
2,059,934✔
575
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
2,059,934✔
576
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
2,062,471!
577
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
2,063,389✔
578

579
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
2,060,845!
580
    return terrno;
×
581
  }
582

583
  uint32_t defaultPgsz = 0;
2,061,520✔
584
  int64_t defaultBufsz = 0;
2,061,520✔
585
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
2,061,520✔
586
  if (code) {
2,061,879!
587
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
588
    return code;
×
589
  }
590

591
  if (!osTempSpaceAvailable()) {
2,061,879!
592
    code = TSDB_CODE_NO_DISKSPACE;
×
593
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
594
    return code;
×
595
  }
596

597
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
2,060,482✔
598
  if (code != TSDB_CODE_SUCCESS) {
2,064,599!
UNCOV
599
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
×
600
    return code;
×
601
  }
602

603
  return code;
2,064,781✔
604
}
605

606
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1,756✔
607
  int32_t         code = TSDB_CODE_SUCCESS;
1,756✔
608
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
1,756✔
609
  int32_t         numOfExprs = pSup->numOfExprs;
1,756✔
610
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,756✔
611
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,756✔
612
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,756✔
613
  bool            needCleanup = false;
1,756✔
614

615
  for (int32_t j = 0; j < numOfExprs; ++j) {
28,378✔
616
    needCleanup |= pCtx[j].needCleanup;
26,622✔
617
  }
618
  if (!needCleanup) {
1,756!
619
    return;
1,756✔
620
  }
621
  
622
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
623
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
624
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
625
    SResultRow*        pRow = NULL;
×
626

627
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
628
    if (TSDB_CODE_SUCCESS != code) {
×
629
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
630
      continue;
×
631
    }
632

633
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
634
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
635
      if (pCtx[j].fpSet.cleanup) {
×
636
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
637
      }
638
    }
639
  }
640
}
641

642
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
1,022,418✔
643
                                  SGroupResInfo* pGroupResInfo) {
644
  int32_t         numOfExprs = pSup->numOfExprs;
1,022,418✔
645
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,022,418✔
646
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,022,418✔
647
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,022,418✔
648
  bool            needCleanup = false;
1,022,782✔
649

650
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,855,230✔
651
    needCleanup |= pCtx[j].needCleanup;
1,832,448✔
652
  }
653
  if (!needCleanup) {
1,022,782✔
654
    return;
1,018,034✔
655
  }
656

657
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
4,748!
658
    SResultRow*        pRow = NULL;
×
659
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
660
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
661
    if (page == NULL) {
×
662
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
663
      continue;
×
664
    }
665
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
666

667

668
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
669
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
670
      if (pCtx[j].fpSet.cleanup) {
×
671
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
672
      }
673
    }
674
    releaseBufPage(pBuf, page);
×
675
  }
676
}
677

678
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
306,894✔
679
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
680
  int32_t         numOfExprs = pSup->numOfExprs;
306,894✔
681
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
306,894✔
682
  SqlFunctionCtx* pCtx = pSup->pCtx;
306,894✔
683
  bool            needCleanup = false;
306,894✔
684
  for (int32_t j = 0; j < numOfExprs; ++j) {
944,125✔
685
    needCleanup |= pCtx[j].needCleanup;
637,231✔
686
  }
687
  if (!needCleanup) {
306,894✔
688
    return;
306,825✔
689
  }
690

691
  // begin from last iter
692
  void*   pData = pGroupResInfo->dataPos;
69✔
693
  int32_t iter = pGroupResInfo->iter;
69✔
694
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
69!
695
    SResultRowPosition* pos = pData;
×
696

697
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
698
    if (page == NULL) {
×
699
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
700
      continue;
×
701
    }
702

703
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
704

705
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
706
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
707
      if (pCtx[j].fpSet.cleanup) {
×
708
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
709
      }
710
    }
711

712
    releaseBufPage(pBuf, page);
×
713
  }
714
}
715

716
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
1,329,152✔
717
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
718
  if (cleanGroupResInfo) {
1,329,152✔
719
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
1,022,544✔
720
  } else {
721
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
306,608✔
722
  }
723
}
1,329,917✔
724
void cleanupAggSup(SAggSupporter* pAggSup) {
2,066,169✔
725
  taosMemoryFreeClear(pAggSup->keyBuf);
2,066,169!
726
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
2,066,125✔
727
  destroyDiskbasedBuf(pAggSup->pResultBuf);
2,066,639✔
728
}
2,066,955✔
729

730
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
2,060,761✔
731
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
732
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
2,060,761✔
733
  if (code != TSDB_CODE_SUCCESS) {
2,061,969!
734
    return code;
×
735
  }
736

737
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
2,061,969✔
738
  if (code != TSDB_CODE_SUCCESS) {
2,064,484!
739
    return code;
×
740
  }
741

742
  for (int32_t i = 0; i < numOfCols; ++i) {
6,914,454✔
743
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
4,849,970✔
744
    if (pState) {
4,849,970✔
745
      pSup->pCtx[i].saveHandle.pBuf = NULL;
251,733✔
746
      pSup->pCtx[i].saveHandle.pState = pState;
251,733✔
747
      pSup->pCtx[i].exprIdx = i;
251,733✔
748
    } else {
749
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
4,598,237✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
2,064,484✔
754
}
755

756
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
120,172,094✔
757
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
758
  int32_t code = TSDB_CODE_SUCCESS;
120,172,094✔
759
  for (int32_t k = 0; k < numOfOutput; ++k) {
682,885,630✔
760
    // keep it temporarily
761
    SFunctionCtxStatus status = {0};
563,479,443✔
762
    functionCtxSave(&pCtx[k], &status);
563,479,443✔
763

764
    pCtx[k].input.startRowIndex = offset;
564,331,456✔
765
    pCtx[k].input.numOfRows = forwardStep;
564,331,456✔
766

767
    // not a whole block involved in query processing, statistics data can not be used
768
    // NOTE: the original value of isSet have been changed here
769
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
564,331,456!
770
      pCtx[k].input.colDataSMAIsSet = false;
×
771
    }
772

773
    if (pCtx[k].isPseudoFunc) {
564,331,456✔
774
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
163,727,470✔
775

776
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
163,727,470✔
777

778
      SColumnInfoData idata = {0};
163,727,470✔
779
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
163,727,470✔
780
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
163,727,470✔
781
      idata.pData = p;
163,727,470✔
782

783
      SScalarParam out = {.columnData = &idata};
163,727,470✔
784
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
163,727,470✔
785
      code = pCtx[k].sfp.process(&tw, 1, &out);
163,727,470✔
786
      if (code != TSDB_CODE_SUCCESS) {
166,858,870!
UNCOV
787
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
788
        taskInfo->code = code;
×
789
        return code;
×
790
      }
791
      pEntryInfo->numOfRes = 1;
167,087,911✔
792
    } else {
793
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
400,603,986✔
794
        if ((&pCtx[k])->input.pData[0] == NULL) {
385,203,727!
795
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
796
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
797
        } else {
798
          code = pCtx[k].fpSet.process(&pCtx[k]);
385,203,727✔
799
        }
800

801
        if (code != TSDB_CODE_SUCCESS) {
385,908,108!
802
          if (pCtx[k].fpSet.cleanup != NULL) {
×
803
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
804
          }
805
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
806
          taskInfo->code = code;
×
807
          return code;
×
808
        }
809
      }
810

811
      // restore it
812
      functionCtxRestore(&pCtx[k], &status);
408,904,080✔
813
    }
814
  }
815
  return code;
119,406,187✔
816
}
817

818
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
563,420,137✔
819
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
563,420,137✔
820
  pStatus->numOfRows = pCtx->input.numOfRows;
563,420,137✔
821
  pStatus->startOffset = pCtx->input.startRowIndex;
563,420,137✔
822
}
563,420,137✔
823

824
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
407,485,317✔
825
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
407,485,317✔
826
  pCtx->input.numOfRows = pStatus->numOfRows;
407,485,317✔
827
  pCtx->input.startRowIndex = pStatus->startOffset;
407,485,317✔
828
}
407,485,317✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc