• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3544

30 Nov 2024 03:06AM UTC coverage: 60.88% (+0.04%) from 60.842%
#3544

push

travis-ci

web-flow
Merge pull request #28988 from taosdata/main

merge: from main to 3.0 branch

120724 of 253479 branches covered (47.63%)

Branch coverage included in aggregate %.

407 of 489 new or added lines in 21 files covered. (83.23%)

1148 existing lines in 113 files now uncovered.

201919 of 276488 relevant lines covered (73.03%)

18898587.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.44
/source/libs/executor/src/aggregateoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "function.h"
18
#include "os.h"
19
#include "querynodes.h"
20
#include "tfill.h"
21
#include "tname.h"
22

23
#include "executorInt.h"
24
#include "index.h"
25
#include "operator.h"
26
#include "query.h"
27
#include "querytask.h"
28
#include "tcompare.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "thash.h"
32
#include "ttypes.h"
33

34
typedef struct {
35
  bool    hasAgg;
36
  int32_t numOfRows;
37
  int32_t startOffset;
38
} SFunctionCtxStatus;
39

40
typedef struct SAggOperatorInfo {
41
  SOptrBasicInfo   binfo;
42
  SAggSupporter    aggSup;
43
  STableQueryInfo* current;
44
  uint64_t         groupId;
45
  SGroupResInfo    groupResInfo;
46
  SExprSupp        scalarExprSup;
47
  bool             groupKeyOptimized;
48
  bool             hasValidBlock;
49
  SSDataBlock*     pNewGroupBlock;
50
  bool             hasCountFunc;
51
  SOperatorInfo*   pOperator;
52
  bool             cleanGroupResInfo;
53
} SAggOperatorInfo;
54

55
static void destroyAggOperatorInfo(void* param);
56
static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
57

58
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
59
static void    destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
60

61
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
62
static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes);
63
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
64
                                const char* pKey);
65

66
static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
67

68
static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
69

70
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
71
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus);
72

73
int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo,
1,709,275✔
74
                                    SOperatorInfo** pOptrInfo) {
75
  QRY_PARAM_CHECK(pOptrInfo);
1,709,275!
76

77
  int32_t    lino = 0;
1,709,275✔
78
  int32_t    code = 0;
1,709,275✔
79
  int32_t    num = 0;
1,709,275✔
80
  SExprInfo* pExprInfo = NULL;
1,709,275✔
81
  int32_t    numOfScalarExpr = 0;
1,709,275✔
82
  SExprInfo* pScalarExprInfo = NULL;
1,709,275✔
83

84
  SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
1,709,275✔
85
  SOperatorInfo*    pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,709,709✔
86
  if (pInfo == NULL || pOperator == NULL) {
1,710,169!
87
    code = terrno;
×
88
    goto _error;
×
89
  }
90

91
  pOperator->exprSupp.hasWindowOrGroup = false;
1,710,270✔
92

93
  SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc);
1,710,270✔
94
  QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
1,712,232!
95
  initBasicInfo(&pInfo->binfo, pResBlock);
1,712,232✔
96

97
  size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
1,711,898✔
98
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,711,898✔
99

100
  code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num);
1,711,911✔
101
  TSDB_CHECK_CODE(code, lino, _error);
1,709,770!
102

103
  code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
1,709,770✔
104
                               pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
1,709,770✔
105
  TSDB_CHECK_CODE(code, lino, _error);
1,711,409!
106

107
  if (pAggNode->pExprs != NULL) {
1,711,409✔
108
    code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr);
198,158✔
109
    TSDB_CHECK_CODE(code, lino, _error);
198,137!
110
  }
111

112
  code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
1,711,388✔
113
  TSDB_CHECK_CODE(code, lino, _error);
1,711,397!
114

115
  code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
1,711,397✔
116
  TSDB_CHECK_CODE(code, lino, _error);
1,710,685!
117

118
  pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
1,710,685✔
119
  pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized;
1,710,685✔
120
  pInfo->groupId = UINT64_MAX;
1,710,685✔
121
  pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
1,710,685✔
122
  pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
1,710,685✔
123
  pInfo->hasCountFunc = pAggNode->hasCountLikeFunc;
1,710,685✔
124
  pInfo->pOperator = pOperator;
1,710,685✔
125
  pInfo->cleanGroupResInfo = false;
1,710,685✔
126

127
  setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
1,710,685✔
128
                  !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
1,710,685✔
129
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo,
1,710,932✔
130
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
131

132
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
1,710,474✔
133
    STableScanInfo* pTableScanInfo = downstream->info;
559,133✔
134
    pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp;
559,133✔
135
    pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup;
559,133✔
136
  }
137

138
  code = appendDownstream(pOperator, &downstream, 1);
1,710,474✔
139
  if (code != TSDB_CODE_SUCCESS) {
1,712,072!
140
    goto _error;
×
141
  }
142

143
  *pOptrInfo = pOperator;
1,712,072✔
144
  return TSDB_CODE_SUCCESS;
1,712,072✔
145

146
_error:
×
147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
148
  if (pInfo != NULL) {
×
149
    destroyAggOperatorInfo(pInfo);
×
150
  }
151
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
152
  pTaskInfo->code = code;
×
153
  return code;
×
154
}
155

156
void destroyAggOperatorInfo(void* param) {
1,711,656✔
157
  if (param == NULL) {
1,711,656!
158
    return;
×
159
  }
160
  SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
1,711,656✔
161
  cleanupBasicInfo(&pInfo->binfo);
1,711,656✔
162

163
  if (pInfo->pOperator) {
1,711,722!
164
    cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup,
1,711,810✔
165
                      pInfo->cleanGroupResInfo);
1,711,810✔
166
    pInfo->pOperator = NULL;
1,710,683✔
167
  }
168
  cleanupAggSup(&pInfo->aggSup);
1,710,595✔
169
  cleanupExprSupp(&pInfo->scalarExprSup);
1,712,069✔
170
  cleanupGroupResInfo(&pInfo->groupResInfo);
1,708,949✔
171
  taosMemoryFreeClear(param);
1,711,770✔
172
}
173

174
/**
175
 * @brief get blocks from downstream and fill results into groupedRes after aggragation
176
 * @retval false if no more groups
177
 * @retval true if there could have new groups coming
178
 * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
179
 *       if false, fill results of ONE GROUP
180
 * */
181
static bool nextGroupedResult(SOperatorInfo* pOperator) {
1,836,239✔
182
  int32_t           code = TSDB_CODE_SUCCESS;
1,836,239✔
183
  int32_t           lino = 0;
1,836,239✔
184
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
1,836,239✔
185
  SAggOperatorInfo* pAggInfo = pOperator->info;
1,836,239✔
186

187
  if(!pAggInfo) {
1,836,239!
188
    qError("function:%s, pAggInfo is NULL", __func__);
×
189
    return false;
×
190
  }
191
  if (pOperator->blocking && pAggInfo->hasValidBlock) {
1,836,239✔
192
    return false;
78,651✔
193
  }
194

195
  SExprSupp*   pSup = &pOperator->exprSupp;
1,757,588✔
196
  int64_t      st = taosGetTimestampUs();
1,761,026✔
197
  int32_t      order = pAggInfo->binfo.inputTsOrder;
1,761,026✔
198
  SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
1,761,026✔
199

200
  pAggInfo->cleanGroupResInfo = false;
1,761,026✔
201
  if (pBlock) {
1,761,026✔
202
    pAggInfo->pNewGroupBlock = NULL;
49,902✔
203
    tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
49,902✔
204
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
49,902✔
205
    QUERY_CHECK_CODE(code, lino, _end);
49,902!
206
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
49,902✔
207
    QUERY_CHECK_CODE(code, lino, _end);
49,902!
208

209
    code = doAggregateImpl(pOperator, pSup->pCtx);
49,902✔
210
    QUERY_CHECK_CODE(code, lino, _end);
49,902!
211
  }
212
  while (1) {
9,223,745✔
213
    bool blockAllocated = false;
10,984,771✔
214
    pBlock = getNextBlockFromDownstream(pOperator, 0);
10,984,771✔
215
    if (pBlock == NULL) {
10,977,203✔
216
      if (!pAggInfo->hasValidBlock) {
1,822,214✔
217
        code = createDataBlockForEmptyInput(pOperator, &pBlock);
243,300✔
218
        QUERY_CHECK_CODE(code, lino, _end);
243,295!
219

220
        if (pBlock == NULL) {
243,295✔
221
          break;
127,286✔
222
        }
223
        blockAllocated = true;
116,009✔
224
      } else {
225
        break;
1,578,914✔
226
      }
227
    }
228
    pAggInfo->hasValidBlock = true;
9,270,998✔
229
    pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
9,270,998✔
230

231
    // there is an scalar expression that needs to be calculated before apply the group aggregation.
232
    if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) {
9,270,998✔
233
      SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
1,472,481✔
234
      code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
1,472,481✔
235
      if (code != TSDB_CODE_SUCCESS) {
1,472,822!
236
        destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
237
        T_LONG_JMP(pTaskInfo->env, code);
×
238
      }
239
    }
240
    // if non-blocking mode and new group arrived, save the block and break
241
    if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
9,271,339✔
242
      pAggInfo->pNewGroupBlock = pBlock;
49,954✔
243
      break;
49,954✔
244
    }
245
    // the pDataBlock are always the same one, no need to call this again
246
    code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
9,221,385✔
247
    if (code != TSDB_CODE_SUCCESS) {
9,222,903✔
248
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
37✔
249
      T_LONG_JMP(pTaskInfo->env, code);
37!
250
    }
251
    code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
9,222,866✔
252
    if (code != TSDB_CODE_SUCCESS) {
9,223,611!
253
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
254
      T_LONG_JMP(pTaskInfo->env, code);
×
255
    }
256

257
    code = doAggregateImpl(pOperator, pSup->pCtx);
9,223,611✔
258
    if (code != TSDB_CODE_SUCCESS) {
9,223,801!
259
      destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
×
260
      T_LONG_JMP(pTaskInfo->env, code);
×
261
    }
262

263
    destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
9,223,801✔
264
  }
265

266
  // the downstream operator may return with error code, so let's check the code before generating results.
267
  if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
1,756,154!
268
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
269
  }
270

271
  code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
1,756,154✔
272
  QUERY_CHECK_CODE(code, lino, _end);
1,756,350!
273
  pAggInfo->cleanGroupResInfo = true;
1,756,350✔
274

275
_end:
1,756,350✔
276
  if (code != TSDB_CODE_SUCCESS) {
1,756,350!
277
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
278
    pTaskInfo->code = code;
×
279
    T_LONG_JMP(pTaskInfo->env, code);
×
280
  }
281
  return pBlock != NULL;
1,756,350✔
282
}
283

284
int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,391,146✔
285
  int32_t           code = TSDB_CODE_SUCCESS;
3,391,146✔
286
  int32_t           lino = 0;
3,391,146✔
287
  SAggOperatorInfo* pAggInfo = pOperator->info;
3,391,146✔
288
  SOptrBasicInfo*   pInfo = &pAggInfo->binfo;
3,391,146✔
289

290
  if (pOperator->status == OP_EXEC_DONE) {
3,391,146✔
291
    (*ppRes) = NULL;
1,554,956✔
292
    return code;
1,554,956✔
293
  }
294

295
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,836,190✔
296
  bool           hasNewGroups = false;
1,836,190✔
297
  do {
298
    hasNewGroups = nextGroupedResult(pOperator);
1,836,218✔
299
    code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
1,834,594✔
300
    QUERY_CHECK_CODE(code, lino, _end);
1,835,155!
301

302
    while (1) {
303
      doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
1,835,155✔
304
      code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1,833,551✔
305
      QUERY_CHECK_CODE(code, lino, _end);
1,833,503!
306

307
      if (!hasRemainResults(&pAggInfo->groupResInfo)) {
1,833,503✔
308
        if (!hasNewGroups) setOperatorCompleted(pOperator);
1,755,398✔
309
        break;
1,755,721✔
310
      }
311

312
      if (pInfo->pRes->info.rows > 0) {
78,614!
313
        break;
78,614✔
314
      }
315
    }
316
  } while (pInfo->pRes->info.rows == 0 && hasNewGroups);
1,834,335✔
317

318
  size_t rows = blockDataGetNumOfRows(pInfo->pRes);
1,834,307✔
319
  pOperator->resultInfo.totalRows += rows;
1,833,887✔
320

321
_end:
1,833,887✔
322
  if (code != TSDB_CODE_SUCCESS) {
1,833,887!
323
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
324
    pTaskInfo->code = code;
×
325
    T_LONG_JMP(pTaskInfo->env, code);
×
326
  }
327

328
  (*ppRes) = (rows == 0) ? NULL : pInfo->pRes;
1,833,887✔
329
  return code;
1,833,887✔
330
}
331

332
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
×
333
  SSDataBlock* pRes = NULL;
×
334
  int32_t code = getAggregateResultNext(pOperator, &pRes);
×
335
  return pRes;
×
336
}
337

338
int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
9,272,704✔
339
  int32_t code = TSDB_CODE_SUCCESS;
9,272,704✔
340
  if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) {
9,272,704!
341
    qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__);
×
342
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
343
  }
344
  for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
24,304,294✔
345
    if (functionNeedToExecute(&pCtx[k])) {
15,029,964✔
346
      // todo add a dummy function to avoid process check
347
      if (pCtx[k].fpSet.process == NULL) {
15,020,138✔
348
        continue;
407,200✔
349
      }
350

351
      if ((&pCtx[k])->input.pData[0] == NULL) {
14,612,938!
352
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
353
        qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo));
×
354
      } else {
355
        code = pCtx[k].fpSet.process(&pCtx[k]);
14,612,938✔
356
      }
357

358
      if (code != TSDB_CODE_SUCCESS) {
14,614,980!
359
        if (pCtx[k].fpSet.cleanup != NULL) {
×
360
          pCtx[k].fpSet.cleanup(&pCtx[k]);
×
361
        }
362
        qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
×
363
        return code;
×
364
      }
365
    }
366
  }
367

368
  return TSDB_CODE_SUCCESS;
9,274,330✔
369
}
370

371
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) {
243,260✔
372
  int32_t code = TSDB_CODE_SUCCESS;
243,260✔
373
  int32_t lino = 0;
243,260✔
374
  SSDataBlock* pBlock = NULL;
243,260✔
375
  if (!tsCountAlwaysReturnValue) {
243,260✔
376
    return TSDB_CODE_SUCCESS;
48,439✔
377
  }
378

379
  SAggOperatorInfo* pAggInfo = pOperator->info;
194,821✔
380
  if (pAggInfo->groupKeyOptimized) {
194,821✔
381
    return TSDB_CODE_SUCCESS;
43,276✔
382
  }
383

384
  SOperatorInfo* downstream = pOperator->pDownstream[0];
151,545✔
385
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
151,545✔
386
      downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
151,260✔
387
      (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
151,085✔
388
       ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
33,098✔
389
    return TSDB_CODE_SUCCESS;
506✔
390
  }
391

392
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
151,039✔
393

394
  if (!pAggInfo->hasCountFunc) {
151,039✔
395
    return TSDB_CODE_SUCCESS;
35,065✔
396
  }
397

398
  code = createDataBlock(&pBlock);
115,974✔
399
  if (code) {
116,038!
400
    return code;
×
401
  }
402

403
  pBlock->info.rows = 1;
116,038✔
404
  pBlock->info.capacity = 0;
116,038✔
405

406
  for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
243,402✔
407
    SColumnInfoData colInfo = {0};
127,401✔
408
    colInfo.hasNull = true;
127,401✔
409
    colInfo.info.type = TSDB_DATA_TYPE_NULL;
127,401✔
410
    colInfo.info.bytes = 1;
127,401✔
411

412
    SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
127,401✔
413
    for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
255,192✔
414
      SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
127,828✔
415
      if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
127,828✔
416
        int32_t slotId = pFuncParam->pCol->slotId;
127,785✔
417
        int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
127,785✔
418
        if (slotId >= numOfCols) {
127,765✔
419
          code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1);
116,946✔
420
          QUERY_CHECK_CODE(code, lino, _end);
116,930!
421

422
          for (int32_t k = numOfCols; k < slotId + 1; ++k) {
240,265✔
423
            void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo);
123,336✔
424
            QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
123,335!
425
          }
426
        }
427
      } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
43✔
428
        // do nothing
429
      }
430
    }
431
  }
432

433
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows);
116,001✔
434
  QUERY_CHECK_CODE(code, lino, _end);
116,050!
435

436
  for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) {
239,388✔
437
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
123,303✔
438
    QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
123,332!
439
    colDataSetNULL(pColInfoData, 0);
440
  }
441
  *ppBlock = pBlock;
116,005✔
442

443
_end:
116,005✔
444
  if (code != TSDB_CODE_SUCCESS) {
116,005!
445
    blockDataDestroy(pBlock);
×
446
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
447
  }
448
  return code;
116,007✔
449
}
450

451
void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) {
9,223,455✔
452
  if (!blockAllocated) {
9,223,455✔
453
    return;
9,107,735✔
454
  }
455

456
  blockDataDestroy(*ppBlock);
115,720✔
457
  *ppBlock = NULL;
116,047✔
458
}
459

460
int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
9,272,989✔
461
  int32_t           code = TSDB_CODE_SUCCESS;
9,272,989✔
462
  SAggOperatorInfo* pAggInfo = pOperator->info;
9,272,989✔
463
  if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
9,272,989✔
464
    return code;
6,755,255✔
465
  }
466

467
  code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
2,517,734✔
468

469
  // record the current active group id
470
  pAggInfo->groupId = groupId;
2,517,856✔
471
  return code;
2,517,856✔
472
}
473

474
int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
2,518,693✔
475
  // for simple group by query without interval, all the tables belong to one group result.
476
  int32_t           code = TSDB_CODE_SUCCESS;
2,518,693✔
477
  int32_t           lino = 0;
2,518,693✔
478
  SExecTaskInfo*    pTaskInfo = pOperator->pTaskInfo;
2,518,693✔
479
  SAggOperatorInfo* pAggInfo = pOperator->info;
2,518,693✔
480

481
  SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
2,518,693✔
482
  SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
2,518,693✔
483
  int32_t*        rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
2,518,693✔
484

485
  SResultRow* pResultRow =
486
      doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true,
2,518,693✔
487
                             groupId, pTaskInfo, false, &pAggInfo->aggSup, true);
488
  if (pResultRow == NULL || pTaskInfo->code != 0) {
2,519,275!
489
    code = pTaskInfo->code;
×
490
    lino = __LINE__;
×
491
    goto _end;
×
492
  }
493
  /*
494
   * not assign result buffer yet, add new result buffer
495
   * all group belong to one result set, and each group result has different group id so set the id to be one
496
   */
497
  if (pResultRow->pageId == -1) {
2,519,297!
498
    code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
×
499
    QUERY_CHECK_CODE(code, lino, _end);
×
500
  }
501

502
  code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
2,519,297✔
503
  QUERY_CHECK_CODE(code, lino, _end);
2,518,142✔
504

505
_end:
2,518,116✔
506
  if (code != TSDB_CODE_SUCCESS) {
2,518,120✔
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
37!
508
  }
509
  return code;
2,517,829✔
510
}
511

512
// a new buffer page for each table. Needs to opt this design
513
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) {
×
514
  if (pWindowRes->pageId != -1) {
×
515
    return 0;
×
516
  }
517

518
  SFilePage* pData = NULL;
×
519

520
  // in the first scan, new space needed for results
521
  int32_t pageId = -1;
×
522
  SArray* list = getDataBufPagesIdList(pResultBuf);
×
523

524
  if (taosArrayGetSize(list) == 0) {
×
525
    pData = getNewBufPage(pResultBuf, &pageId);
×
526
    if (pData == NULL) {
×
527
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
528
      return terrno;
×
529
    }
530
    pData->num = sizeof(SFilePage);
×
531
  } else {
532
    SPageInfo* pi = getLastPageInfo(list);
×
533
    pData = getBufPage(pResultBuf, getPageId(pi));
×
534
    if (pData == NULL) {
×
535
      qError("failed to get buffer, code:%s", tstrerror(terrno));
×
536
      return terrno;
×
537
    }
538

539
    pageId = getPageId(pi);
×
540

541
    if (pData->num + size > getBufPageSize(pResultBuf)) {
×
542
      // release current page first, and prepare the next one
543
      releaseBufPageInfo(pResultBuf, pi);
×
544

545
      pData = getNewBufPage(pResultBuf, &pageId);
×
546
      if (pData == NULL) {
×
547
        qError("failed to get buffer, code:%s", tstrerror(terrno));
×
548
        return terrno;
×
549
      }
550
      pData->num = sizeof(SFilePage);
×
551
    }
552
  }
553

554
  if (pData == NULL) {
×
555
    return -1;
×
556
  }
557

558
  // set the number of rows in current disk page
559
  if (pWindowRes->pageId == -1) {  // not allocated yet, allocate new buffer
×
560
    pWindowRes->pageId = pageId;
×
561
    pWindowRes->offset = (int32_t)pData->num;
×
562

563
    pData->num += size;
×
564
  }
565

566
  return 0;
×
567
}
568

569
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
6,597,285✔
570
                         const char* pKey) {
571
  int32_t code = 0;
6,597,285✔
572
  //  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
573

574
  pAggSup->currentPageId = -1;
6,597,285✔
575
  pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
6,597,285✔
576
  pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
6,603,765✔
577
  pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash);
6,607,838✔
578

579
  if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
6,597,584!
580
    return terrno;
×
581
  }
582

583
  uint32_t defaultPgsz = 0;
6,599,177✔
584
  uint32_t defaultBufsz = 0;
6,599,177✔
585
  code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
6,599,177✔
586
  if (code) {
6,603,815!
587
    qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize);
×
588
    return code;
×
589
  }
590

591
  if (!osTempSpaceAvailable()) {
6,603,815!
592
    code = TSDB_CODE_NO_DISKSPACE;
×
593
    qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir);
×
594
    return code;
×
595
  }
596

597
  code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
6,601,291✔
598
  if (code != TSDB_CODE_SUCCESS) {
6,606,213!
UNCOV
599
    qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
×
600
    return code;
×
601
  }
602

603
  return code;
6,606,634✔
604
}
605

606
void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
1,756✔
607
  int32_t         code = TSDB_CODE_SUCCESS;
1,756✔
608
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
1,756✔
609
  int32_t         numOfExprs = pSup->numOfExprs;
1,756✔
610
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
1,756✔
611
  SqlFunctionCtx* pCtx = pSup->pCtx;
1,756✔
612
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
1,756✔
613
  bool            needCleanup = false;
1,756✔
614

615
  for (int32_t j = 0; j < numOfExprs; ++j) {
28,378✔
616
    needCleanup |= pCtx[j].needCleanup;
26,622✔
617
  }
618
  if (!needCleanup) {
1,756!
619
    return;
1,756✔
620
  }
621
  
622
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
×
623
    SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i);
×
624
    SRowBuffPos*       pPos = pWinInfo->pStatePos;
×
625
    SResultRow*        pRow = NULL;
×
626

627
    code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
×
628
    if (TSDB_CODE_SUCCESS != code) {
×
629
      qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
×
630
      continue;
×
631
    }
632

633
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
634
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
635
      if (pCtx[j].fpSet.cleanup) {
×
636
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
637
      }
638
    }
639
  }
640
}
641

642
void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
3,364,001✔
643
                                  SGroupResInfo* pGroupResInfo) {
644
  int32_t         numOfExprs = pSup->numOfExprs;
3,364,001✔
645
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
3,364,001✔
646
  SqlFunctionCtx* pCtx = pSup->pCtx;
3,364,001✔
647
  int32_t         numOfRows = getNumOfTotalRes(pGroupResInfo);
3,364,001✔
648
  bool            needCleanup = false;
3,363,308✔
649

650
  for (int32_t j = 0; j < numOfExprs; ++j) {
10,100,750✔
651
    needCleanup |= pCtx[j].needCleanup;
6,737,442✔
652
  }
653
  if (!needCleanup) {
3,363,308✔
654
    return;
3,342,858✔
655
  }
656

657
  for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
20,450!
658
    SResultRow*        pRow = NULL;
×
659
    SResKeyPos*        pPos = taosArrayGetP(pGroupResInfo->pRows, i);
×
660
    SFilePage*         page = getBufPage(pBuf, pPos->pos.pageId);
×
661
    if (page == NULL) {
×
662
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
663
      continue;
×
664
    }
665
    pRow = (SResultRow*)((char*)page + pPos->pos.offset);
×
666

667

668
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
669
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
670
      if (pCtx[j].fpSet.cleanup) {
×
671
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
672
      }
673
    }
674
    releaseBufPage(pBuf, page);
×
675
  }
676
}
677

678
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
958,412✔
679
                       SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) {
680
  int32_t         numOfExprs = pSup->numOfExprs;
958,412✔
681
  int32_t*        rowEntryOffset = pSup->rowEntryInfoOffset;
958,412✔
682
  SqlFunctionCtx* pCtx = pSup->pCtx;
958,412✔
683
  bool            needCleanup = false;
958,412✔
684
  for (int32_t j = 0; j < numOfExprs; ++j) {
3,040,504✔
685
    needCleanup |= pCtx[j].needCleanup;
2,082,092✔
686
  }
687
  if (!needCleanup) {
958,412✔
688
    return;
957,034✔
689
  }
690

691
  // begin from last iter
692
  void*   pData = pGroupResInfo->dataPos;
1,378✔
693
  int32_t iter = pGroupResInfo->iter;
1,378✔
694
  while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
1,378!
695
    SResultRowPosition* pos = pData;
×
696

697
    SFilePage* page = getBufPage(pBuf, pos->pageId);
×
698
    if (page == NULL) {
×
699
      qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
×
700
      continue;
×
701
    }
702

703
    SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
×
704

705
    for (int32_t j = 0; j < numOfExprs; ++j) {
×
706
      pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
×
707
      if (pCtx[j].fpSet.cleanup) {
×
708
        pCtx[j].fpSet.cleanup(&pCtx[j]);
×
709
      }
710
    }
711

712
    releaseBufPage(pBuf, page);
×
713
  }
714
}
715

716
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
4,322,414✔
717
                       SAggSupporter *pAggSup, bool cleanGroupResInfo) {
718
  if (cleanGroupResInfo) {
4,322,414✔
719
    cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo);
3,364,214✔
720
  } else {
721
    cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable);
958,200✔
722
  }
723
}
4,321,835✔
724
void cleanupAggSup(SAggSupporter* pAggSup) {
6,605,611✔
725
  taosMemoryFreeClear(pAggSup->keyBuf);
6,605,611!
726
  tSimpleHashCleanup(pAggSup->pResultRowHashTable);
6,607,651✔
727
  destroyDiskbasedBuf(pAggSup->pResultBuf);
6,607,787✔
728
}
6,607,829✔
729

730
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
6,600,215✔
731
                   const char* pkey, void* pState, SFunctionStateStore* pStore) {
732
  int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
6,600,215✔
733
  if (code != TSDB_CODE_SUCCESS) {
6,603,259!
734
    return code;
×
735
  }
736

737
  code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
6,603,259✔
738
  if (code != TSDB_CODE_SUCCESS) {
6,606,072!
739
    return code;
×
740
  }
741

742
  for (int32_t i = 0; i < numOfCols; ++i) {
28,039,236✔
743
    pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup;
21,433,164✔
744
    if (pState) {
21,433,164✔
745
      pSup->pCtx[i].saveHandle.pBuf = NULL;
249,318✔
746
      pSup->pCtx[i].saveHandle.pState = pState;
249,318✔
747
      pSup->pCtx[i].exprIdx = i;
249,318✔
748
    } else {
749
      pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
21,183,846✔
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
6,606,072✔
754
}
755

756
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
474,387,503✔
757
                                        int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
758
  int32_t code = TSDB_CODE_SUCCESS;
474,387,503✔
759
  for (int32_t k = 0; k < numOfOutput; ++k) {
1,758,718,723✔
760
    // keep it temporarily
761
    SFunctionCtxStatus status = {0};
1,287,123,595✔
762
    functionCtxSave(&pCtx[k], &status);
1,287,123,595✔
763

764
    pCtx[k].input.startRowIndex = offset;
1,284,278,371✔
765
    pCtx[k].input.numOfRows = forwardStep;
1,284,278,371✔
766

767
    // not a whole block involved in query processing, statistics data can not be used
768
    // NOTE: the original value of isSet have been changed here
769
    if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) {
1,284,278,371!
770
      pCtx[k].input.colDataSMAIsSet = false;
×
771
    }
772

773
    if (pCtx[k].isPseudoFunc) {
1,284,278,371✔
774
      SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
299,110,145✔
775

776
      char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
299,110,145✔
777

778
      SColumnInfoData idata = {0};
299,110,145✔
779
      idata.info.type = TSDB_DATA_TYPE_BIGINT;
299,110,145✔
780
      idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
299,110,145✔
781
      idata.pData = p;
299,110,145✔
782

783
      SScalarParam out = {.columnData = &idata};
299,110,145✔
784
      SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
299,110,145✔
785
      code = pCtx[k].sfp.process(&tw, 1, &out);
299,110,145✔
786
      if (code != TSDB_CODE_SUCCESS) {
302,207,316✔
787
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
301,834!
788
        taskInfo->code = code;
×
789
        return code;
×
790
      }
791
      pEntryInfo->numOfRes = 1;
301,905,482✔
792
    } else {
793
      if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
985,168,226✔
794
        if ((&pCtx[k])->input.pData[0] == NULL) {
780,782,030!
795
          code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
796
          qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo));
×
797
        } else {
798
          code = pCtx[k].fpSet.process(&pCtx[k]);
780,782,030✔
799
        }
800

801
        if (code != TSDB_CODE_SUCCESS) {
780,913,362!
802
          if (pCtx[k].fpSet.cleanup != NULL) {
×
803
            pCtx[k].fpSet.cleanup(&pCtx[k]);
×
804
          }
805
          qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
×
806
          taskInfo->code = code;
×
807
          return code;
×
808
        }
809
      }
810

811
      // restore it
812
      functionCtxRestore(&pCtx[k], &status);
996,334,086✔
813
    }
814
  }
815
  return code;
471,595,128✔
816
}
817

818
void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
1,287,053,859✔
819
  pStatus->hasAgg = pCtx->input.colDataSMAIsSet;
1,287,053,859✔
820
  pStatus->numOfRows = pCtx->input.numOfRows;
1,287,053,859✔
821
  pStatus->startOffset = pCtx->input.startRowIndex;
1,287,053,859✔
822
}
1,287,053,859✔
823

824
void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
995,906,761✔
825
  pCtx->input.colDataSMAIsSet = pStatus->hasAgg;
995,906,761✔
826
  pCtx->input.numOfRows = pStatus->numOfRows;
995,906,761✔
827
  pCtx->input.startRowIndex = pStatus->startOffset;
995,906,761✔
828
}
995,906,761✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc