• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3593

24 Jan 2025 08:57AM UTC coverage: 63.239% (-0.3%) from 63.546%
#3593

push

travis-ci

web-flow
Merge pull request #29638 from taosdata/docs/TS-5846-3.0

enh: TDengine modify taosBenchmark new query rule cases and add doc

140619 of 285630 branches covered (49.23%)

Branch coverage included in aggregate %.

218877 of 282844 relevant lines covered (77.38%)

19647377.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.0
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
1,938,539✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
1,938,539!
76

77
  int32_t    lino = 0;
1,938,539✔
78
  int32_t    code = 0;
1,938,539✔
79
  int32_t    num = 0;
1,938,539✔
80
  SExprInfo* pExprInfo = NULL;
1,938,539✔
81
  int32_t    numOfScalarExpr = 0;
1,938,539✔
82
  SExprInfo* pScalarExprInfo = NULL;
1,938,539✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
1,938,539!
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,939,592!
86
  if (pInfo == NULL || pOperator == NULL) {
1,940,173!
87
    code = terrno;
×
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
1,940,378✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
1,940,378✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,940,624!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
1,940,624✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,941,249✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,941,249✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
1,941,940✔
101
  TSDB_CHECK_CODE(code, lino, _error);
1,940,089!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,940,089✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,940,089✔
105
  TSDB_CHECK_CODE(code, lino, _error);
1,940,710!
106

107
  if (pAggNode->pExprs != NULL) {
1,940,710✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
161,538✔
109
    TSDB_CHECK_CODE(code, lino, _error);
161,537!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,940,709✔
113
  TSDB_CHECK_CODE(code, lino, _error);
1,941,004!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,941,004✔
116
  TSDB_CHECK_CODE(code, lino, _error);
1,940,942!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1,940,942✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
1,940,942✔
120
  pInfo->groupId = UINT64_MAX;
1,940,942✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
1,940,942✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
1,940,942✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
1,940,942✔
124
  pInfo->pOperator = pOperator;
1,940,942✔
125
  pInfo->cleanGroupResInfo = false;
1,940,942✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
1,940,942✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
1,940,942✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
1,940,739✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1,939,983✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
478,629✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
478,629✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
478,629✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
1,939,983✔
139
  if (code != TSDB_CODE_SUCCESS) {
1,941,270!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
1,941,270✔
144
  return TSDB_CODE_SUCCESS;
1,941,270✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
1,942,520✔
157
  if (param == NULL) {
1,942,520!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
1,942,520✔
161
  cleanupBasicInfo(&pInfo->binfo);
1,942,520✔
162

163
  if (pInfo->pOperator) {
1,943,312!
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,943,312✔
165
                      pInfo->cleanGroupResInfo);
1,943,312✔
166
    pInfo->pOperator = NULL;
1,942,649✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
1,942,649✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
1,943,342✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,943,092✔
171
  taosMemoryFreeClear(param);
1,943,351!
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
2,060,519✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
2,060,519✔
183
  int32_t           lino = 0;
2,060,519✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2,060,519✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,060,519✔
186

187
  if(!pAggInfo) {
2,060,519!
188
    qError("function:%s, pAggInfo is NULL", __func__);
×
189
    return false;
×
190
  }
191
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
2,060,519✔
192
    return false;
74,425✔
193
  }
194

195
  SExprSupp*   pSup = &pOperator->exprSupp;
1,986,094✔
196
  int64_t      st = taosGetTimestampUs();
1,989,211✔
197
  int32_t      order = pAggInfo->binfo.inputTsOrder;
1,989,211✔
198
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
1,989,211✔
199

200
  pAggInfo->cleanGroupResInfo = false;
1,989,211✔
201
  if (pBlock) {
1,989,211✔
202
    pAggInfo->pNewGroupBlock = NULL;
49,927✔
203
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,927✔
204
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,927✔
205
    QUERY_CHECK_CODE(code, lino, _end);
49,927!
206
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,927✔
207
    QUERY_CHECK_CODE(code, lino, _end);
49,927!
208

209
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,927✔
210
    QUERY_CHECK_CODE(code, lino, _end);
49,927!
211
  }
212
  while (1) {
10,668,587✔
213
    bool blockAllocated = false;
12,657,798✔
214
    pBlock = getNextBlockFromDownstream(pOperator, 0);
12,657,798✔
215
    if (pBlock == NULL) {
12,644,129✔
216
      if (!pAggInfo->hasValidBlock) {
2,067,043✔
217
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
247,667✔
218
        QUERY_CHECK_CODE(code, lino, _end);
247,636!
219

220
        if (pBlock == NULL) {
247,636✔
221
          break;
108,305✔
222
        }
223
        blockAllocated = true;
139,331✔
224
      } else {
225
        break;
1,819,376✔
226
      }
227
    }
228
    pAggInfo->hasValidBlock = true;
10,716,417✔
229
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
10,716,417✔
230

231
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
232
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
10,716,417✔
233
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
1,478,392✔
234
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1,478,392✔
235
      if (code != TSDB_CODE_SUCCESS) {
1,478,229!
236
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
237
        T_LONG_JMP(pTaskInfo->env, code);
×
238
      }
239
    }
240
    // if non-blocking mode and new group arrived, save the block and break
241
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
10,716,254✔
242
      pAggInfo->pNewGroupBlock = pBlock;
49,979✔
243
      break;
49,979✔
244
    }
245
    // the pDataBlock are always the same one, no need to call this again
246
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
10,666,275✔
247
    if (code != TSDB_CODE_SUCCESS) {
10,668,060✔
248
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
26✔
249
      T_LONG_JMP(pTaskInfo->env, code);
26!
250
    }
251
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
10,668,034✔
252
    if (code != TSDB_CODE_SUCCESS) {
10,668,472!
253
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
254
      T_LONG_JMP(pTaskInfo->env, code);
×
255
    }
256

257
    code = doAggregateImpl(pOperator, pSup->pCtx);
10,668,472✔
258
    if (code != TSDB_CODE_SUCCESS) {
10,669,017!
259
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
260
      T_LONG_JMP(pTaskInfo->env, code);
×
261
    }
262

263
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
10,669,017✔
264
  }
265

266
  // the downstream operator may return with error code, so let's check the code before generating results.
267
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
1,977,660!
268
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
269
  }
270

271
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
1,977,660✔
272
  QUERY_CHECK_CODE(code, lino, _end);
1,977,395!
273
  pAggInfo->cleanGroupResInfo = true;
1,977,395✔
274

275
_end:
1,977,395✔
276
  if (code != TSDB_CODE_SUCCESS) {
1,977,395!
277
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
278
    pTaskInfo->code = code;
×
279
    T_LONG_JMP(pTaskInfo->env, code);
×
280
  }
281
  return pBlock != NULL;
1,977,395✔
282
}
283

284
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,855,992✔
285
  int32_t           code = TSDB_CODE_SUCCESS;
3,855,992✔
286
  int32_t           lino = 0;
3,855,992✔
287
  SAggOperatorInfo* pAggInfo = pOperator->info;
3,855,992✔
288
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
3,855,992✔
289

290
  if (pOperator->status == OP_EXEC_DONE) {
3,855,992✔
291
    (*ppRes) = NULL;
1,795,512✔
292
    return code;
1,795,512✔
293
  }
294

295
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,060,480✔
296
  bool           hasNewGroups = false;
2,060,480✔
297
  do {
298
    hasNewGroups = nextGroupedResult(pOperator);
2,060,508✔
299
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
2,051,183✔
300
    QUERY_CHECK_CODE(code, lino, _end);
2,052,151!
301

302
    while (1) {
303
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
2,052,151✔
304
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2,050,974✔
305
      QUERY_CHECK_CODE(code, lino, _end);
2,050,938!
306

307
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
2,050,938✔
308
        if (!hasNewGroups) setOperatorCompleted(pOperator);
1,976,452✔
309
        break;
1,977,177✔
310
      }
311

312
      if (pInfo->pRes->info.rows > 0) {
74,414!
313
        break;
74,414✔
314
      }
315
    }
316
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
2,051,591✔
317

318
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
2,051,563✔
319
  pOperator->resultInfo.totalRows += rows;
2,050,924✔
320

321
_end:
2,050,924✔
322
  if (code != TSDB_CODE_SUCCESS) {
2,050,924!
323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
324
    pTaskInfo->code = code;
×
325
    T_LONG_JMP(pTaskInfo->env, code);
×
326
  }
327

328
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
2,050,924✔
329
  return code;
2,050,924✔
330
}
331

332
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
333
  SSDataBlock* pRes = NULL;
×
334
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
335
  return pRes;
×
336
}
337

338
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
10,717,927✔
339
  int32_t code = TSDB_CODE_SUCCESS;
10,717,927✔
340
  if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) {
10,717,927!
341
    qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__);
×
342
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
343
  }
344
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
23,364,509✔
345
    if (functionNeedToExecute(&pCtx[k])) {
12,644,772✔
346
      // todo add a dummy function to avoid process check
347
      if (pCtx[k].fpSet.process == NULL) {
12,643,901✔
348
        continue;
226,520✔
349
      }
350

351
      if ((&pCtx[k])->input.pData[0] == NULL) {
12,417,381!
352
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
353
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
354
      } else {
355
        code = pCtx[k].fpSet.process(&pCtx[k]);
12,417,381✔
356
      }
357

358
      if (code != TSDB_CODE_SUCCESS) {
12,418,487!
359
        if (pCtx[k].fpSet.cleanup != NULL) {
×
360
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
361
        }
362
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
363
        return code;
×
364
      }
365
    }
366
  }
367

368
  return TSDB_CODE_SUCCESS;
10,719,737✔
369
}
370

371
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
247,648✔
372
  int32_t code = TSDB_CODE_SUCCESS;
247,648✔
373
  int32_t lino = 0;
247,648✔
374
  SSDataBlock* pBlock = NULL;
247,648✔
375
  if (!tsCountAlwaysReturnValue) {
247,648✔
376
    return TSDB_CODE_SUCCESS;
48,363✔
377
  }
378

379
  SAggOperatorInfo* pAggInfo = pOperator->info;
199,285✔
380
  if (pAggInfo->groupKeyOptimized) {
199,285✔
381
    return TSDB_CODE_SUCCESS;
29,162✔
382
  }
383

384
  SOperatorInfo* downstream = pOperator->pDownstream[0];
170,123✔
385
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
170,123✔
386
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
169,825✔
387
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
169,507✔
388
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
26,994✔
389
    return TSDB_CODE_SUCCESS;
639✔
390
  }
391

392
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
169,484✔
393

394
  if (!pAggInfo->hasCountFunc) {
169,484✔
395
    return TSDB_CODE_SUCCESS;
30,142✔
396
  }
397

398
  code = createDataBlock(&pBlock);
139,342✔
399
  if (code) {
139,358!
400
    return code;
×
401
  }
402

403
  pBlock->info.rows = 1;
139,358✔
404
  pBlock->info.capacity = 0;
139,358✔
405

406
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
289,911✔
407
    SColumnInfoData colInfo = {0};
150,610✔
408
    colInfo.hasNull = true;
150,610✔
409
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
150,610✔
410
    colInfo.info.bytes = 1;
150,610✔
411

412
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
150,610✔
413
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
301,578✔
414
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
151,025✔
415
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
151,025✔
416
        int32_t slotId = pFuncParam->pCol->slotId;
150,990✔
417
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
150,990✔
418
        if (slotId >= numOfCols) {
150,967✔
419
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
140,263✔
420
          QUERY_CHECK_CODE(code, lino, _end);
140,255!
421

422
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
284,785✔
423
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
144,556✔
424
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
144,530!
425
          }
426
        }
427
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
35✔
428
        // do nothing
429
      }
430
    }
431
  }
432

433
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
139,301✔
434
  QUERY_CHECK_CODE(code, lino, _end);
139,370!
435

436
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
283,921✔
437
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
144,478✔
438
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
144,542!
439
    colDataSetNULL(pColInfoData, 0);
440
  }
441
  *ppBlock = pBlock;
139,337✔
442

443
_end:
139,337✔
444
  if (code != TSDB_CODE_SUCCESS) {
139,337!
445
    blockDataDestroy(pBlock);
×
446
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
447
  }
448
  return code;
139,329✔
449
}
450

451
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
10,668,747✔
452
  if (!blockAllocated) {
10,668,747✔
453
    return;
10,529,736✔
454
  }
455

456
  blockDataDestroy(*ppBlock);
139,011✔
457
  *ppBlock = NULL;
139,377✔
458
}
459

460
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
10,717,956✔
461
  int32_t           code = TSDB_CODE_SUCCESS;
10,717,956✔
462
  SAggOperatorInfo* pAggInfo = pOperator->info;
10,717,956✔
463
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
10,717,956✔
464
    return code;
8,257,685✔
465
  }
466

467
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
2,460,271✔
468

469
  // record the current active group id
470
  pAggInfo->groupId = groupId;
2,460,601✔
471
  return code;
2,460,601✔
472
}
473

474
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
2,460,796✔
475
  // for simple group by query without interval, all the tables belong to one group result.
476
  int32_t           code = TSDB_CODE_SUCCESS;
2,460,796✔
477
  int32_t           lino = 0;
2,460,796✔
478
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2,460,796✔
479
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,460,796✔
480

481
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
2,460,796✔
482
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2,460,796✔
483
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
2,460,796✔
484

485
  SResultRow* pResultRow =
486
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
2,460,796✔
487
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
488
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,461,328!
489
    code = pTaskInfo->code;
×
490
    lino = __LINE__;
×
491
    goto _end;
×
492
  }
493
  /*
494
   * not assign result buffer yet, add new result buffer
495
   * all group belong to one result set, and each group result has different group id so set the id to be one
496
   */
497
  if (pResultRow->pageId == -1) {
2,461,374!
498
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
499
    QUERY_CHECK_CODE(code, lino, _end);
×
500
  }
501

502
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,461,374✔
503
  QUERY_CHECK_CODE(code, lino, _end);
2,460,416!
504

505
_end:
2,460,416✔
506
  if (code != TSDB_CODE_SUCCESS) {
2,460,370✔
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
26!
508
  }
509
  return code;
2,460,481✔
510
}
511

512
// a new buffer page for each table. Needs to opt this design
513
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
514
  if (pWindowRes->pageId != -1) {
×
515
    return 0;
×
516
  }
517

518
  SFilePage* pData = NULL;
×
519

520
  // in the first scan, new space needed for results
521
  int32_t pageId = -1;
×
522
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
523

524
  if (taosArrayGetSize(list) == 0) {
×
525
    pData = getNewBufPage(pResultBuf, &pageId);
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530
    pData->num = sizeof(SFilePage);
×
531
  } else {
532
    SPageInfo* pi = getLastPageInfo(list);
×
533
    pData = getBufPage(pResultBuf, getPageId(pi));
×
534
    if (pData == NULL) {
×
535
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
536
      return terrno;
×
537
    }
538

539
    pageId = getPageId(pi);
×
540

541
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
542
      // release current page first, and prepare the next one
543
      releaseBufPageInfo(pResultBuf, pi);
×
544

545
      pData = getNewBufPage(pResultBuf, &pageId);
×
546
      if (pData == NULL) {
×
547
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
548
        return terrno;
×
549
      }
550
      pData->num = sizeof(SFilePage);
×
551
    }
552
  }
553

554
  if (pData == NULL) {
×
555
    return -1;
×
556
  }
557

558
  // set the number of rows in current disk page
559
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
560
    pWindowRes->pageId = pageId;
×
561
    pWindowRes->offset = (int32_t)pData->num;
×
562

563
    pData->num += size;
×
564
  }
565

566
  return 0;
×
567
}
568

569
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
6,588,560✔
570
                         const char* pKey) {
571
  int32_t code = 0;
6,588,560✔
572
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
573

574
  pAggSup->currentPageId = -1;
6,588,560✔
575
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
6,588,560✔
576
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
6,596,700!
577
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
6,597,921✔
578

579
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
6,590,945!
580
    return terrno;
×
581
  }
582

583
  uint32_t defaultPgsz = 0;
6,591,635✔
584
  int64_t defaultBufsz = 0;
6,591,635✔
585
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
6,591,635✔
586
  if (code) {
6,596,183!
587
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
588
    return code;
×
589
  }
590

591
  if (!osTempSpaceAvailable()) {
6,596,183!
592
    code = TSDB_CODE_NO_DISKSPACE;
×
593
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
594
    return code;
×
595
  }
596

597
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
6,594,409✔
598
  if (code != TSDB_CODE_SUCCESS) {
6,600,670✔
599
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
1,404!
600
    return code;
×
601
  }
602

603
  return code;
6,599,266✔
604
}
605

606
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
2,036✔
607
  int32_t         code = TSDB_CODE_SUCCESS;
2,036✔
608
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
2,036✔
609
  int32_t         numOfExprs = pSup->numOfExprs;
2,036✔
610
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
2,036✔
611
  SqlFunctionCtx* pCtx = pSup->pCtx;
2,036✔
612
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
2,036✔
613
  bool            needCleanup = false;
2,036✔
614

615
  for (int32_t j = 0; j < numOfExprs; ++j) {
29,645✔
616
    needCleanup |= pCtx[j].needCleanup;
27,609✔
617
  }
618
  if (!needCleanup) {
2,036!
619
    return;
2,036✔
620
  }
621
  
622
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
623
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
624
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
625
    SResultRow*        pRow = NULL;
×
626

627
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
628
    if (TSDB_CODE_SUCCESS != code) {
×
629
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
630
      continue;
×
631
    }
632

633
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
634
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
635
      if (pCtx[j].fpSet.cleanup) {
×
636
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
637
      }
638
    }
639
  }
640
}
641

642
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
3,528,280✔
643
                                  SGroupResInfo* pGroupResInfo) {
644
  int32_t         numOfExprs = pSup->numOfExprs;
3,528,280✔
645
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
3,528,280✔
646
  SqlFunctionCtx* pCtx = pSup->pCtx;
3,528,280✔
647
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
3,528,280✔
648
  bool            needCleanup = false;
3,528,547✔
649

650
  for (int32_t j = 0; j < numOfExprs; ++j) {
9,935,028✔
651
    needCleanup |= pCtx[j].needCleanup;
6,406,481✔
652
  }
653
  if (!needCleanup) {
3,528,547✔
654
    return;
3,514,359✔
655
  }
656

657
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
14,188!
658
    SResultRow*        pRow = NULL;
×
659
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
660
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
661
    if (page == NULL) {
×
662
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
663
      continue;
×
664
    }
665
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
666

667

668
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
669
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
670
      if (pCtx[j].fpSet.cleanup) {
×
671
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
672
      }
673
    }
674
    releaseBufPage(pBuf, page);
×
675
  }
676
}
677

678
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
914,147✔
679
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
680
  int32_t         numOfExprs = pSup->numOfExprs;
914,147✔
681
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
914,147✔
682
  SqlFunctionCtx* pCtx = pSup->pCtx;
914,147✔
683
  bool            needCleanup = false;
914,147✔
684
  for (int32_t j = 0; j < numOfExprs; ++j) {
2,764,859✔
685
    needCleanup |= pCtx[j].needCleanup;
1,850,712✔
686
  }
687
  if (!needCleanup) {
914,147✔
688
    return;
912,742✔
689
  }
690

691
  // begin from last iter
692
  void*   pData = pGroupResInfo->dataPos;
1,405✔
693
  int32_t iter = pGroupResInfo->iter;
1,405✔
694
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
1,405!
695
    SResultRowPosition* pos = pData;
×
696

697
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
698
    if (page == NULL) {
×
699
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
700
      continue;
×
701
    }
702

703
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
704

705
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
706
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
707
      if (pCtx[j].fpSet.cleanup) {
×
708
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
709
      }
710
    }
711

712
    releaseBufPage(pBuf, page);
×
713
  }
714
}
715

716
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
4,442,670✔
717
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
718
  if (cleanGroupResInfo) {
4,442,670✔
719
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
3,528,765✔
720
  } else {
721
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
913,905✔
722
  }
723
}
4,443,187✔
724
void cleanupAggSup(SAggSupporter* pAggSup) {
6,601,675✔
725
  taosMemoryFreeClear(pAggSup->keyBuf);
6,601,675!
726
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
6,602,582✔
727
  destroyDiskbasedBuf(pAggSup->pResultBuf);
6,603,149✔
728
}
6,603,182✔
729

730
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
6,593,548✔
731
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
732
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
6,593,548✔
733
  if (code != TSDB_CODE_SUCCESS) {
6,591,367!
734
    return code;
×
735
  }
736

737
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
6,591,367✔
738
  if (code != TSDB_CODE_SUCCESS) {
6,599,430!
739
    return code;
×
740
  }
741

742
  for (int32_t i = 0; i < numOfCols; ++i) {
26,116,546✔
743
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
19,517,116✔
744
    if (pState) {
19,517,116✔
745
      pSup->pCtx[i].saveHandle.pBuf = NULL;
253,827✔
746
      pSup->pCtx[i].saveHandle.pState = pState;
253,827✔
747
      pSup->pCtx[i].exprIdx = i;
253,827✔
748
    } else {
749
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
19,263,289✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
6,599,430✔
754
}
755

756
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
438,920,879✔
757
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
758
  int32_t code = TSDB_CODE_SUCCESS;
438,920,879✔
759
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,635,009,911✔
760
    // keep it temporarily
761
    SFunctionCtxStatus status = {0};
1,196,863,370✔
762
    functionCtxSave(&pCtx[k], &status);
1,196,863,370✔
763

764
    pCtx[k].input.startRowIndex = offset;
1,195,883,462✔
765
    pCtx[k].input.numOfRows = forwardStep;
1,195,883,462✔
766

767
    // not a whole block involved in query processing, statistics data can not be used
768
    // NOTE: the original value of isSet have been changed here
769
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
1,195,883,462!
770
      pCtx[k].input.colDataSMAIsSet = false;
×
771
    }
772

773
    if (pCtx[k].isPseudoFunc) {
1,195,883,462✔
774
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
288,075,668✔
775

776
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
288,075,668✔
777

778
      SColumnInfoData idata = {0};
288,075,668✔
779
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
288,075,668✔
780
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
288,075,668✔
781
      idata.pData = p;
288,075,668✔
782

783
      SScalarParam out = {.columnData = &idata};
288,075,668✔
784
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
288,075,668✔
785
      code = pCtx[k].sfp.process(&tw, 1, &out);
288,075,668✔
786
      if (code != TSDB_CODE_SUCCESS) {
291,896,655✔
787
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
227,662!
788
        taskInfo->code = code;
×
789
        return code;
×
790
      }
791
      pEntryInfo->numOfRes = 1;
291,668,993✔
792
    } else {
793
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
907,807,794✔
794
        if ((&pCtx[k])->input.pData[0] == NULL) {
748,020,708!
795
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
796
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
797
        } else {
798
          code = pCtx[k].fpSet.process(&pCtx[k]);
748,020,708✔
799
        }
800

801
        if (code != TSDB_CODE_SUCCESS) {
749,070,052!
802
          if (pCtx[k].fpSet.cleanup != NULL) {
×
803
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
804
          }
805
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
806
          taskInfo->code = code;
×
807
          return code;
×
808
        }
809
      }
810

811
      // restore it
812
      functionCtxRestore(&pCtx[k], &status);
918,518,341✔
813
    }
814
  }
815
  return code;
438,146,541✔
816
}
817

818
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
1,196,320,600✔
819
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
1,196,320,600✔
820
  pStatus->numOfRows = pCtx->input.numOfRows;
1,196,320,600✔
821
  pStatus->startOffset = pCtx->input.startRowIndex;
1,196,320,600✔
822
}
1,196,320,600✔
823

824
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
917,311,046✔
825
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
917,311,046✔
826
  pCtx->input.numOfRows = pStatus->numOfRows;
917,311,046✔
827
  pCtx->input.startRowIndex = pStatus->startOffset;
917,311,046✔
828
}
917,311,046✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc